summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-06-15 13:14:57 +0200
committerantirez <antirez@gmail.com>2019-06-18 12:31:10 +0200
commite9bb30fd859ed4e9e3e6434207dedbc251086858 (patch)
tree0ac8972fefe6911dee8c7376e14c611fcc105e1c
parentfd0ee469ab165d0e005e9fe1fca1c4f5c604cd56 (diff)
downloadredis-new-keyspace.tar.gz
Experimental: new keyspace and expire algorithm.new-keyspace
This is an alpha quality implementation of a new keyspace representation and a new expire algorithm for Redis. This work is described here: https://gist.github.com/antirez/b2eb293819666ee104c7fcad71986eb7
-rw-r--r--src/aof.c69
-rw-r--r--src/bio.c2
-rw-r--r--src/bitops.c12
-rw-r--r--src/blocked.c2
-rw-r--r--src/cluster.c59
-rw-r--r--src/config.c12
-rw-r--r--src/db.c334
-rw-r--r--src/debug.c34
-rw-r--r--src/defrag.c2
-rw-r--r--src/dict.c37
-rw-r--r--src/dict.h43
-rw-r--r--src/evict.c39
-rw-r--r--src/expire.c452
-rw-r--r--src/geo.c10
-rw-r--r--src/gopher.c2
-rw-r--r--src/hyperloglog.c12
-rw-r--r--src/latency.c6
-rw-r--r--src/lazyfree.c30
-rw-r--r--src/module.c44
-rw-r--r--src/object.c53
-rw-r--r--src/rdb.c39
-rw-r--r--src/rdb.h5
-rw-r--r--src/redis-benchmark.c3
-rw-r--r--src/redis-cli.c12
-rw-r--r--src/sentinel.c12
-rw-r--r--src/server.c155
-rw-r--r--src/server.h71
-rw-r--r--src/sort.c8
-rw-r--r--src/t_hash.c24
-rw-r--r--src/t_list.c36
-rw-r--r--src/t_set.c32
-rw-r--r--src/t_stream.c28
-rw-r--r--src/t_string.c34
-rw-r--r--src/t_zset.c43
34 files changed, 998 insertions, 758 deletions
diff --git a/src/aof.c b/src/aof.c
index 4744847d2..b9ade76a4 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -923,7 +923,7 @@ int rioWriteBulkObject(rio *r, robj *obj) {
/* Emit the commands needed to rebuild a list object.
* The function returns 0 on error, 1 on success. */
-int rewriteListObject(rio *r, robj *key, robj *o) {
+int rewriteListObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = listTypeLength(o);
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
@@ -937,7 +937,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) {
AOF_REWRITE_ITEMS_PER_CMD : items;
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (entry.value) {
@@ -957,7 +957,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) {
/* Emit the commands needed to rebuild a set object.
* The function returns 0 on error, 1 on success. */
-int rewriteSetObject(rio *r, robj *key, robj *o) {
+int rewriteSetObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = setTypeSize(o);
if (o->encoding == OBJ_ENCODING_INTSET) {
@@ -971,7 +971,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkLongLong(r,llval) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
@@ -989,7 +989,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
@@ -1004,7 +1004,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
/* Emit the commands needed to rebuild a sorted set object.
* The function returns 0 on error, 1 on success. */
-int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
+int rewriteSortedSetObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = zsetLength(o);
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
@@ -1030,7 +1030,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkDouble(r,score) == 0) return 0;
if (vstr != NULL) {
@@ -1057,7 +1057,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkDouble(r,*score) == 0) return 0;
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
@@ -1099,7 +1099,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
/* Emit the commands needed to rebuild a hash object.
* The function returns 0 on error, 1 on success. */
-int rewriteHashObject(rio *r, robj *key, robj *o) {
+int rewriteHashObject(rio *r, rkey *key, robj *o) {
hashTypeIterator *hi;
long long count = 0, items = hashTypeLength(o);
@@ -1111,7 +1111,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0;
@@ -1140,14 +1140,14 @@ int rioWriteBulkStreamID(rio *r,streamID *id) {
* add the message described by 'nack' having the id 'rawid', into the pending
* list of the specified consumer. All this in the context of the specified
* key and group. */
-int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
+int rioWriteStreamPendingEntry(rio *r, rkey *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
/* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
RETRYCOUNT <count> JUSTID FORCE. */
streamID id;
streamDecodeID(rawid,&id);
if (rioWriteBulkCount(r,'*',12) == 0) return 0;
if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
@@ -1163,7 +1163,7 @@ int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t
/* Emit the commands needed to rebuild a stream object.
* The function returns 0 on error, 1 on success. */
-int rewriteStreamObject(rio *r, robj *key, robj *o) {
+int rewriteStreamObject(rio *r, rkey *key, robj *o) {
stream *s = o->ptr;
streamIterator si;
streamIteratorStart(&si,s,NULL,NULL,0);
@@ -1179,7 +1179,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
@@ -1195,7 +1195,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
* for the Stream type. */
if (rioWriteBulkCount(r,'*',7) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
@@ -1207,7 +1207,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
* in case of XDEL lastid. */
if (rioWriteBulkCount(r,'*',3) == 0) return 0;
if (rioWriteBulkString(r,"XSETID",6) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
@@ -1222,7 +1222,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',5) == 0) return 0;
if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0;
if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0;
@@ -1262,12 +1262,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Call the module type callback in order to rewrite a data type
* that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
-int rewriteModuleObject(rio *r, robj *key, robj *o) {
+int rewriteModuleObject(rio *r, rkey *key, robj *o) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
- moduleInitIOContext(io,mt,r,key);
- mt->aof_rewrite(&io,key,mv->value);
+ robj *keyname = createStringObject(key->name,key->len);
+ moduleInitIOContext(io,mt,r,keyname);
+ mt->aof_rewrite(&io,keyname,mv->value);
+ decrRefCount(keyname);
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
@@ -1309,15 +1311,14 @@ int rewriteAppendOnlyFileRio(rio *aof) {
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
- sds keystr;
- robj key, *o;
+ rkey *key;
+ robj *o;
long long expiretime;
- keystr = dictGetKey(de);
+ key = dictGetKey(de);
o = dictGetVal(de);
- initStaticStringObject(key,keystr);
- expiretime = getExpire(db,&key);
+ expiretime = getExpire(key);
/* Save the key and associated value */
if (o->type == OBJ_STRING) {
@@ -1325,20 +1326,21 @@ int rewriteAppendOnlyFileRio(rio *aof) {
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
- if (rioWriteBulkObject(aof,&key) == 0) goto werr;
+ if (rioWriteBulkString(aof,key->name,key->len) == 0)
+ goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
- if (rewriteListObject(aof,&key,o) == 0) goto werr;
+ if (rewriteListObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
- if (rewriteSetObject(aof,&key,o) == 0) goto werr;
+ if (rewriteSetObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
- if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
+ if (rewriteSortedSetObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
- if (rewriteHashObject(aof,&key,o) == 0) goto werr;
+ if (rewriteHashObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
- if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
+ if (rewriteStreamObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
- if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
+ if (rewriteModuleObject(aof,key,o) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}
@@ -1346,7 +1348,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
- if (rioWriteBulkObject(aof,&key) == 0) goto werr;
+ if (rioWriteBulkString(aof,key->name,key->len) == 0)
+ goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}
/* Read some diff from the parent process from time to time. */
diff --git a/src/bio.c b/src/bio.c
index 2af684570..749727e79 100644
--- a/src/bio.c
+++ b/src/bio.c
@@ -85,7 +85,7 @@ struct bio_job {
void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
-void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
+void lazyfreeFreeDatabaseFromBioThread(dict *ht, rax *tree);
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl);
/* Make sure we have enough stack to perform all the things we do in the
diff --git a/src/bitops.c b/src/bitops.c
index ee1ce0460..a51538516 100644
--- a/src/bitops.c
+++ b/src/bitops.c
@@ -477,7 +477,7 @@ int getBitfieldTypeFromArgument(client *c, robj *o, int *sign, int *bits) {
* an error is sent to the client. */
robj *lookupStringForBitCommand(client *c, size_t maxbit) {
size_t byte = maxbit >> 3;
- robj *o = lookupKeyWrite(c->db,c->argv[1]);
+ robj *o = lookupKeyWrite(c->db,c->argv[1],NULL);
if (o == NULL) {
o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1));
@@ -571,7 +571,7 @@ void getbitCommand(client *c) {
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
return;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;
byte = bitoffset >> 3;
@@ -625,7 +625,7 @@ void bitopCommand(client *c) {
len = zmalloc(sizeof(long) * numkeys);
objects = zmalloc(sizeof(robj*) * numkeys);
for (j = 0; j < numkeys; j++) {
- o = lookupKeyRead(c->db,c->argv[j+3]);
+ o = lookupKeyRead(c->db,c->argv[j+3],NULL);
/* Handle non-existing keys as empty strings. */
if (o == NULL) {
objects[j] = NULL;
@@ -773,7 +773,7 @@ void bitcountCommand(client *c) {
char llbuf[LONG_STR_SIZE];
/* Lookup, check for type, and return 0 for non existing keys. */
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;
p = getObjectReadOnlyString(o,&strlen,llbuf);
@@ -834,7 +834,7 @@ void bitposCommand(client *c) {
/* If the key does not exist, from our point of view it is an infinite
* array of 0 bits. If the user is looking for the fist clear bit return 0,
* If the user is looking for the first set bit, return -1. */
- if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
+ if ((o = lookupKeyRead(c->db,c->argv[1],NULL)) == NULL) {
addReplyLongLong(c, bit ? -1 : 0);
return;
}
@@ -993,7 +993,7 @@ void bitfieldCommand(client *c) {
if (readonly) {
/* Lookup for read is ok if key doesn't exit, but errors
* if it's not a string. */
- o = lookupKeyRead(c->db,c->argv[1]);
+ o = lookupKeyRead(c->db,c->argv[1],NULL);
if (o != NULL && checkType(c,o,OBJ_STRING)) {
zfree(ops);
return;
diff --git a/src/blocked.c b/src/blocked.c
index 1db657869..c4162e1e7 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -270,7 +270,7 @@ void handleClientsBlockedOnKeys(void) {
dictDelete(rl->db->ready_keys,rl->key);
/* Serve clients blocked on list key. */
- robj *o = lookupKeyWrite(rl->db,rl->key);
+ robj *o = lookupKeyWrite(rl->db,rl->key,NULL);
if (o != NULL && o->type == OBJ_LIST) {
dictEntry *de;
diff --git a/src/cluster.c b/src/cluster.c
index c85e3791d..505c3c757 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4776,7 +4776,7 @@ NULL
/* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */
-void createDumpPayload(rio *payload, robj *o, robj *key) {
+void createDumpPayload(rio *payload, robj *o, rkey *key) {
unsigned char buf[2];
uint64_t crc;
@@ -4833,16 +4833,17 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
* complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
robj *o, *dumpobj;
+ rkey *key;
rio payload;
/* Check if the key is here. */
- if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
+ if ((o = lookupKeyRead(c->db,c->argv[1],&key)) == NULL) {
addReplyNull(c);
return;
}
/* Create the DUMP encoded representation. */
- createDumpPayload(&payload,o,c->argv[1]);
+ createDumpPayload(&payload,o,key);
/* Transfer to the client */
dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
@@ -4893,7 +4894,7 @@ void restoreCommand(client *c) {
}
/* Make sure this key does not already exist here... */
- if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
+ if (!replace && lookupKeyWrite(c->db,c->argv[1],NULL) != NULL) {
addReply(c,shared.busykeyerr);
return;
}
@@ -4925,12 +4926,12 @@ void restoreCommand(client *c) {
if (replace) dbDelete(c->db,c->argv[1]);
/* Create the key and set the TTL if any */
- dbAdd(c->db,c->argv[1],obj);
+ rkey *key = dbAdd(c->db,c->argv[1],obj);
if (ttl) {
if (!absttl) ttl+=mstime();
- setExpire(c,c->db,c->argv[1],ttl);
+ setExpire(c,c->db,key,ttl);
}
- objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
+ objectSetLRUOrLFU(key,lfu_freq,lru_idle,lru_clock);
signalModifiedKey(c->db,c->argv[1]);
addReply(c,shared.ok);
server.dirty++;
@@ -5066,7 +5067,8 @@ void migrateCommand(client *c) {
long timeout;
long dbid;
robj **ov = NULL; /* Objects to migrate. */
- robj **kv = NULL; /* Key names. */
+ robj **knv = NULL; /* Key names. */
+ rkey **kov = NULL; /* Key objects. */
robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
rio cmd, payload;
int may_retry = 1;
@@ -5121,18 +5123,21 @@ void migrateCommand(client *c) {
* this case, since often this is due to a normal condition like the key
* expiring in the meantime. */
ov = zrealloc(ov,sizeof(robj*)*num_keys);
- kv = zrealloc(kv,sizeof(robj*)*num_keys);
+ knv = zrealloc(knv,sizeof(robj*)*num_keys);
+ kov = zrealloc(kov,sizeof(rkey*)*num_keys);
int oi = 0;
for (j = 0; j < num_keys; j++) {
- if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
- kv[oi] = c->argv[first_key+j];
+ rkey *ko;
+ if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j],&ko)) != NULL) {
+ knv[oi] = c->argv[first_key+j];
+ kov[oi] = ko;
oi++;
}
}
num_keys = oi;
if (num_keys == 0) {
- zfree(ov); zfree(kv);
+ zfree(ov); zfree(knv); zfree(kov);
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
@@ -5143,7 +5148,7 @@ try_again:
/* Connect */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
- zfree(ov); zfree(kv);
+ zfree(ov); zfree(knv); zfree(kov);
return; /* error sent to the client by migrateGetSocket() */
}
@@ -5173,7 +5178,7 @@ try_again:
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
- long long expireat = getExpire(c->db,kv[j]);
+ long long expireat = getExpire(kov[j]);
if (expireat != -1) {
ttl = expireat-mstime();
@@ -5186,7 +5191,9 @@ try_again:
/* Relocate valid (non expired) keys into the array in successive
* positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */
- kv[non_expired++] = kv[j];
+ knv[non_expired] = knv[j];
+ kov[non_expired] = kov[j];
+ non_expired++;
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
@@ -5196,14 +5203,14 @@ try_again:
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
- serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
- serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
- sdslen(kv[j]->ptr)));
+ serverAssertWithInfo(c,NULL,sdsEncodedObject(knv[j]));
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,knv[j]->ptr,
+ sdslen(knv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
/* Emit the payload argument, that is the serialized object using
* the DUMP format. */
- createDumpPayload(&payload,ov[j],kv[j]);
+ createDumpPayload(&payload,ov[j],kov[j]);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
@@ -5283,13 +5290,13 @@ try_again:
} else {
if (!copy) {
/* No COPY option: remove the local key, signal the change. */
- dbDelete(c->db,kv[j]);
- signalModifiedKey(c->db,kv[j]);
+ dbDelete(c->db,knv[j]);
+ signalModifiedKey(c->db,knv[j]);
server.dirty++;
/* Populate the argument vector to replace the old one. */
- newargv[del_idx++] = kv[j];
- incrRefCount(kv[j]);
+ newargv[del_idx++] = knv[j];
+ incrRefCount(knv[j]);
}
}
}
@@ -5347,7 +5354,7 @@ try_again:
}
sdsfree(cmd.io.buffer.ptr);
- zfree(ov); zfree(kv); zfree(newargv);
+ zfree(ov); zfree(knv); zfree(kov); zfree(newargv);
return;
/* On socket errors we try to close the cached socket and try again.
@@ -5374,7 +5381,7 @@ socket_err:
}
/* Cleanup we want to do if no retry is attempted. */
- zfree(ov); zfree(kv);
+ zfree(ov); zfree(knv); zfree(kov);
addReplySds(c,
sdscatprintf(sdsempty(),
"-IOERR error or timeout %s to target instance\r\n",
@@ -5554,7 +5561,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
/* Migarting / Improrting slot? Count keys we don't have. */
if ((migrating_slot || importing_slot) &&
- lookupKeyRead(&server.db[0],thiskey) == NULL)
+ lookupKeyRead(&server.db[0],thiskey,NULL) == NULL)
{
missing_keys++;
}
diff --git a/src/config.c b/src/config.c
index 7f0e9af89..9e509b659 100644
--- a/src/config.c
+++ b/src/config.c
@@ -1650,19 +1650,23 @@ void dictListDestructor(void *privdata, void *val);
void rewriteConfigSentinelOption(struct rewriteConfigState *state);
dictType optionToLineDictType = {
- dictSdsCaseHash, /* hash function */
+ dictSdsCaseHash, /* lookup hash function */
+ dictSdsCaseHash, /* store hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCaseCompare, /* key compare */
+ dictSdsKeyCaseCompare, /* lookup key compare */
+ dictSdsKeyCaseCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
dictListDestructor /* val destructor */
};
dictType optionSetDictType = {
- dictSdsCaseHash, /* hash function */
+ dictSdsCaseHash, /* lookup hash function */
+ dictSdsCaseHash, /* store hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCaseCompare, /* key compare */
+ dictSdsKeyCaseCompare, /* lookup key compare */
+ dictSdsKeyCaseCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
diff --git a/src/db.c b/src/db.c
index b537a29a4..101eb1b8a 100644
--- a/src/db.c
+++ b/src/db.c
@@ -38,24 +38,26 @@
* C-level DB API
*----------------------------------------------------------------------------*/
-int keyIsExpired(redisDb *db, robj *key);
+int keyIsExpired(rkey *key);
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
-void updateLFU(robj *val) {
- unsigned long counter = LFUDecrAndReturn(val);
+void updateLFU(rkey *key) {
+ unsigned long counter = LFUDecrAndReturn(key);
counter = LFULogIncr(counter);
- val->lru = (LFUGetTimeInMinutes()<<8) | counter;
+ key->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
/* Low level key lookup API, not actually called directly from commands
* implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
-robj *lookupKey(redisDb *db, robj *key, int flags) {
- dictEntry *de = dictFind(db->dict,key->ptr);
+robj *lookupKey(redisDb *db, robj *keyname, rkey **keyptr, int flags) {
+ dictEntry *de = dictFind(db->dict,keyname->ptr);
if (de) {
+ rkey *key = dictGetKey(de);
robj *val = dictGetVal(de);
+ if (keyptr) *keyptr = key;
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
@@ -65,9 +67,9 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
!(flags & LOOKUP_NOTOUCH))
{
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
- updateLFU(val);
+ updateLFU(key);
} else {
- val->lru = LRU_CLOCK();
+ key->lru = LRU_CLOCK();
}
}
return val;
@@ -98,16 +100,18 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
* for read operations. Even if the key expiry is master-driven, we can
* correctly report a key is expired on slaves even if the master is lagging
* expiring our key via DELs in the replication link. */
-robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
+robj *lookupKeyReadWithFlags(redisDb *db, robj *keyname, rkey **keyptr, int flags) {
robj *val;
+ rkey *keyobj;
- if (expireIfNeeded(db,key) == 1) {
+ val = lookupKey(db,keyname,&keyobj,flags);
+ if (val && expireIfNeeded(db,keyname,keyobj) == 1) {
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) {
server.stat_keyspace_misses++;
- notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
+ notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id);
return NULL;
}
@@ -129,24 +133,24 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
server.current_client->cmd->flags & CMD_READONLY)
{
server.stat_keyspace_misses++;
- notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
+ notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id);
return NULL;
}
}
- val = lookupKey(db,key,flags);
if (val == NULL) {
server.stat_keyspace_misses++;
- notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
+ notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id);
}
else
server.stat_keyspace_hits++;
+ if (keyptr) *keyptr = keyobj;
return val;
}
/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
* common case. */
-robj *lookupKeyRead(redisDb *db, robj *key) {
- return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
+robj *lookupKeyRead(redisDb *db, robj *keyname, rkey **keyptr) {
+ return lookupKeyReadWithFlags(db,keyname,keyptr,LOOKUP_NONE);
}
/* Lookup a key for write operations, and as a side effect, if needed, expires
@@ -154,36 +158,64 @@ robj *lookupKeyRead(redisDb *db, robj *key) {
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
-robj *lookupKeyWrite(redisDb *db, robj *key) {
- expireIfNeeded(db,key);
- return lookupKey(db,key,LOOKUP_NONE);
+robj *lookupKeyWrite(redisDb *db, robj *keyname, rkey **keyptr) {
+ rkey *key;
+ robj *val = lookupKey(db,keyname,&key,LOOKUP_NONE);
+ if (val && expireIfNeeded(db,keyname,key)) return NULL;
+ if (keyptr) *keyptr = key;
+ return val;
}
-robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
- robj *o = lookupKeyRead(c->db, key);
+robj *lookupKeyReadOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply) {
+ robj *o = lookupKeyRead(c->db, keyname, keyptr);
if (!o) addReply(c,reply);
return o;
}
-robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
- robj *o = lookupKeyWrite(c->db, key);
+robj *lookupKeyWriteOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply) {
+ robj *o = lookupKeyWrite(c->db, keyname, keyptr);
if (!o) addReply(c,reply);
return o;
}
+/* Allocate and return a key object. Key objects are used to represent
+ * keys in the main Redis dictionaries that associate key names to
+ * value objects. */
+rkey *createKey(const char *keyname, size_t keylen) {
+ rkey *k = zcalloc(sizeof(*k) + keylen + 1);
+ k->len = keylen;
+ memcpy(k->name,keyname,keylen);
+ k->name[keylen] = '\0';
+ return k;
+}
+
+void freeKey(rkey *key) {
+ zfree(key);
+}
+
/* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed.
*
* The program is aborted if the key already exists. */
-void dbAdd(redisDb *db, robj *key, robj *val) {
- sds copy = sdsdup(key->ptr);
- int retval = dictAdd(db->dict, copy, val);
+rkey *dbAdd(redisDb *db, robj *key, robj *val) {
+ rkey *ko = createKey(key->ptr,sdslen(key->ptr));
+ int retval = dictAdd(db->dict, ko, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
if (val->type == OBJ_LIST ||
val->type == OBJ_ZSET)
signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
+
+
+ /* Set the LRU to the current lruclock (minutes resolution), or
+ * alternatively the LFU counter. */
+ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
+ ko->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
+ } else {
+ ko->lru = LRU_CLOCK();
+ }
+ return ko;
}
/* Overwrite an existing key with a new value. Incrementing the reference
@@ -191,15 +223,13 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
* This function does not modify the expire time of the existing key.
*
* The program is aborted if the key was not already present. */
-void dbOverwrite(redisDb *db, robj *key, robj *val) {
- dictEntry *de = dictFind(db->dict,key->ptr);
+rkey *dbOverwrite(redisDb *db, robj *keyname, robj *val) {
+ dictEntry *de = dictFind(db->dict,keyname->ptr);
- serverAssertWithInfo(NULL,key,de != NULL);
+ serverAssertWithInfo(NULL,keyname,de != NULL);
dictEntry auxentry = *de;
robj *old = dictGetVal(de);
- if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
- val->lru = old->lru;
- }
+ rkey *key = dictGetKey(de);
dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) {
@@ -208,6 +238,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
}
dictFreeVal(db->dict, &auxentry);
+ return key;
}
/* High level Set operation. This function can be used in order to set
@@ -217,16 +248,20 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
* 2) clients WATCHing for the destination key notified.
* 3) The expire time of the key is reset (the key is made persistent).
*
- * All the new keys in the database should be created via this interface. */
-void setKey(redisDb *db, robj *key, robj *val) {
- if (lookupKeyWrite(db,key) == NULL) {
- dbAdd(db,key,val);
+ * All the new keys in the database should be created via this interface.
+ * The function returns a pointer to the key object representing the key
+ * in the database dictionary. */
+rkey *setKey(redisDb *db, robj *keyname, robj *val) {
+ rkey *key;
+ if (lookupKeyWrite(db,keyname,&key) == NULL) {
+ key = dbAdd(db,keyname,val);
} else {
- dbOverwrite(db,key,val);
+ key = dbOverwrite(db,keyname,val);
}
incrRefCount(val);
removeExpire(db,key);
- signalModifiedKey(db,key);
+ signalModifiedKey(db,keyname);
+ return key;
}
int dbExists(redisDb *db, robj *key) {
@@ -240,18 +275,18 @@ int dbExists(redisDb *db, robj *key) {
robj *dbRandomKey(redisDb *db) {
dictEntry *de;
int maxtries = 100;
- int allvolatile = dictSize(db->dict) == dictSize(db->expires);
+ int allvolatile = dictSize(db->dict) == raxSize(db->expires);
while(1) {
- sds key;
- robj *keyobj;
+ rkey *key;
+ robj *keyname;
de = dictGetFairRandomKey(db->dict);
if (de == NULL) return NULL;
key = dictGetKey(de);
- keyobj = createStringObject(key,sdslen(key));
- if (dictFind(db->expires,key)) {
+ keyname = createStringObject(key->name,key->len);
+ if (key->flags & KEY_FLAG_EXPIRE) {
if (allvolatile && server.masterhost && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
@@ -261,28 +296,27 @@ robj *dbRandomKey(redisDb *db) {
* To prevent the infinite loop we do some tries, but if there
* are the conditions for an infinite loop, eventually we
* return a key name that may be already expired. */
- return keyobj;
+ return keyname;
}
- if (expireIfNeeded(db,keyobj)) {
- decrRefCount(keyobj);
+ if (expireIfNeeded(db,keyname,key)) {
+ decrRefCount(keyname);
continue; /* search for another key. This expired. */
}
}
- return keyobj;
+ return keyname;
}
}
-/* Delete a key, value, and associated expiration entry if any, from the DB */
-int dbSyncDelete(redisDb *db, robj *key) {
- /* Deleting an entry from the expires dict will not free the sds of
- * the key, because it is shared with the main dictionary. */
- if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
- if (dictDelete(db->dict,key->ptr) == DICT_OK) {
- if (server.cluster_enabled) slotToKeyDel(key);
- return 1;
- } else {
- return 0;
- }
+/* Delete a key, value, and associated expiration entry if any, from the DB.
+ * The function returns 1 is they existed and was removed, 0 otherwise. */
+int dbSyncDelete(redisDb *db, robj *keyname) {
+ dictEntry *de = dictUnlink(db->dict,keyname->ptr);
+ if (de == NULL) return 0;
+ rkey *key = dictGetKey(de);
+ if (key->flags & KEY_FLAG_EXPIRE) removeExpireFromTree(db,key);
+ if (server.cluster_enabled) slotToKeyDel(keyname);
+ dictFreeUnlinkedEntry(db->dict,de);
+ return 1;
}
/* This is a wrapper whose behavior depends on the Redis lazy free
@@ -312,7 +346,7 @@ int dbDelete(redisDb *db, robj *key) {
* The object 'o' is what the caller already obtained by looking up 'key'
* in 'db', the usage pattern looks like this:
*
- * o = lookupKeyWrite(db,key);
+ * o = lookupKeyWrite(db,key,NULL);
* if (checkType(c,o,OBJ_STRING)) return;
* o = dbUnshareStringValue(db,key,o);
*
@@ -367,7 +401,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
emptyDbAsync(&server.db[j]);
} else {
dictEmpty(server.db[j].dict,callback);
- dictEmpty(server.db[j].expires,callback);
+ raxFreeWithCallback(server.db[j].expires,callback);
}
}
if (server.cluster_enabled) {
@@ -471,7 +505,7 @@ void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;
for (j = 1; j < c->argc; j++) {
- expireIfNeeded(c->db,c->argv[j]);
+ expireIfNeededByName(c->db,c->argv[j]);
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
@@ -500,7 +534,7 @@ void existsCommand(client *c) {
int j;
for (j = 1; j < c->argc; j++) {
- if (lookupKeyRead(c->db,c->argv[j])) count++;
+ if (lookupKeyRead(c->db,c->argv[j],NULL)) count++;
}
addReplyLongLong(c,count);
}
@@ -546,16 +580,13 @@ void keysCommand(client *c) {
di = dictGetSafeIterator(c->db->dict);
allkeys = (pattern[0] == '*' && pattern[1] == '\0');
while((de = dictNext(di)) != NULL) {
- sds key = dictGetKey(de);
- robj *keyobj;
+ rkey *key = dictGetKey(de);
- if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
- keyobj = createStringObject(key,sdslen(key));
- if (!keyIsExpired(c->db,keyobj)) {
- addReplyBulk(c,keyobj);
+ if (allkeys || stringmatchlen(pattern,plen,key->name,key->len,0)) {
+ if (!keyIsExpired(key)) {
+ addReplyBulkCBuffer(c,key->name,key->len);
numkeys++;
}
- decrRefCount(keyobj);
}
}
dictReleaseIterator(di);
@@ -571,8 +602,8 @@ void scanCallback(void *privdata, const dictEntry *de) {
robj *key, *val = NULL;
if (o == NULL) {
- sds sdskey = dictGetKey(de);
- key = createStringObject(sdskey, sdslen(sdskey));
+ rkey *ko = dictGetKey(de);
+ key = createStringObject(ko->name, ko->len);
} else if (o->type == OBJ_SET) {
sds keysds = dictGetKey(de);
key = createStringObject(keysds,sdslen(keysds));
@@ -760,7 +791,12 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
}
/* Filter element if it is an expired key. */
- if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
+ if (!filter &&
+ o == NULL &&
+ expireIfNeededByName(c->db,kobj))
+ {
+ filter = 1;
+ }
/* Remove the element and its associted value if needed. */
if (filter) {
@@ -819,7 +855,7 @@ void typeCommand(client *c) {
robj *o;
char *type;
- o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
+ o = lookupKeyReadWithFlags(c->db,c->argv[1],NULL,LOOKUP_NOTOUCH);
if (o == NULL) {
type = "none";
} else {
@@ -870,6 +906,7 @@ void shutdownCommand(client *c) {
void renameGenericCommand(client *c, int nx) {
robj *o;
+ rkey *key;
long long expire;
int samekey = 0;
@@ -877,7 +914,7 @@ void renameGenericCommand(client *c, int nx) {
* if the key exists, however we still return an error on unexisting key. */
if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
- if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],&key,shared.nokeyerr)) == NULL)
return;
if (samekey) {
@@ -886,8 +923,8 @@ void renameGenericCommand(client *c, int nx) {
}
incrRefCount(o);
- expire = getExpire(c->db,c->argv[1]);
- if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
+ expire = getExpire(key);
+ if (lookupKeyWrite(c->db,c->argv[2],NULL) != NULL) {
if (nx) {
decrRefCount(o);
addReply(c,shared.czero);
@@ -897,8 +934,8 @@ void renameGenericCommand(client *c, int nx) {
* with the same name. */
dbDelete(c->db,c->argv[2]);
}
- dbAdd(c->db,c->argv[2],o);
- if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
+ key = dbAdd(c->db,c->argv[2],o);
+ if (expire != -1) setExpire(c,c->db,key,expire);
dbDelete(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]);
@@ -951,20 +988,21 @@ void moveCommand(client *c) {
}
/* Check if the element exists and get a reference */
- o = lookupKeyWrite(c->db,c->argv[1]);
+ rkey *key;
+ o = lookupKeyWrite(c->db,c->argv[1],&key);
if (!o) {
addReply(c,shared.czero);
return;
}
- expire = getExpire(c->db,c->argv[1]);
+ expire = getExpire(key);
/* Return zero if the key already exists in the target DB */
- if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
+ if (lookupKeyWrite(dst,c->argv[1],NULL) != NULL) {
addReply(c,shared.czero);
return;
}
- dbAdd(dst,c->argv[1],o);
- if (expire != -1) setExpire(c,dst,c->argv[1],expire);
+ key = dbAdd(dst,c->argv[1],o);
+ if (expire != -1) setExpire(c,dst,key,expire);
incrRefCount(o);
/* OK! key moved, free the entry in the source DB */
@@ -982,7 +1020,7 @@ void scanDatabaseForReadyLists(redisDb *db) {
dictIterator *di = dictGetSafeIterator(db->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
- robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
+ robj *value = lookupKey(db,key,NULL,LOOKUP_NOTOUCH);
if (value && (value->type == OBJ_LIST ||
value->type == OBJ_STREAM ||
value->type == OBJ_ZSET))
@@ -1060,134 +1098,6 @@ void swapdbCommand(client *c) {
}
}
-/*-----------------------------------------------------------------------------
- * Expires API
- *----------------------------------------------------------------------------*/
-
-int removeExpire(redisDb *db, robj *key) {
- /* An expire may only be removed if there is a corresponding entry in the
- * main dict. Otherwise, the key will never be freed. */
- serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
- return dictDelete(db->expires,key->ptr) == DICT_OK;
-}
-
-/* Set an expire to the specified key. If the expire is set in the context
- * of an user calling a command 'c' is the client, otherwise 'c' is set
- * to NULL. The 'when' parameter is the absolute unix time in milliseconds
- * after which the key will no longer be considered valid. */
-void setExpire(client *c, redisDb *db, robj *key, long long when) {
- dictEntry *kde, *de;
-
- /* Reuse the sds from the main dict in the expire dict */
- kde = dictFind(db->dict,key->ptr);
- serverAssertWithInfo(NULL,key,kde != NULL);
- de = dictAddOrFind(db->expires,dictGetKey(kde));
- dictSetSignedIntegerVal(de,when);
-
- int writable_slave = server.masterhost && server.repl_slave_ro == 0;
- if (c && writable_slave && !(c->flags & CLIENT_MASTER))
- rememberSlaveKeyWithExpire(db,key);
-}
-
-/* Return the expire time of the specified key, or -1 if no expire
- * is associated with this key (i.e. the key is non volatile) */
-long long getExpire(redisDb *db, robj *key) {
- dictEntry *de;
-
- /* No expire? return ASAP */
- if (dictSize(db->expires) == 0 ||
- (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
-
- /* The entry was found in the expire dict, this means it should also
- * be present in the main dict (safety check). */
- serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
- return dictGetSignedIntegerVal(de);
-}
-
-/* Propagate expires into slaves and the AOF file.
- * When a key expires in the master, a DEL operation for this key is sent
- * to all the slaves and the AOF file if enabled.
- *
- * This way the key expiry is centralized in one place, and since both
- * AOF and the master->slave link guarantee operation ordering, everything
- * will be consistent even if we allow write operations against expiring
- * keys. */
-void propagateExpire(redisDb *db, robj *key, int lazy) {
- robj *argv[2];
-
- argv[0] = lazy ? shared.unlink : shared.del;
- argv[1] = key;
- incrRefCount(argv[0]);
- incrRefCount(argv[1]);
-
- if (server.aof_state != AOF_OFF)
- feedAppendOnlyFile(server.delCommand,db->id,argv,2);
- replicationFeedSlaves(server.slaves,db->id,argv,2);
-
- decrRefCount(argv[0]);
- decrRefCount(argv[1]);
-}
-
-/* Check if the key is expired. */
-int keyIsExpired(redisDb *db, robj *key) {
- mstime_t when = getExpire(db,key);
-
- if (when < 0) return 0; /* No expire for this key */
-
- /* Don't expire anything while loading. It will be done later. */
- if (server.loading) return 0;
-
- /* If we are in the context of a Lua script, we pretend that time is
- * blocked to when the Lua script started. This way a key can expire
- * only the first time it is accessed and not in the middle of the
- * script execution, making propagation to slaves / AOF consistent.
- * See issue #1525 on Github for more information. */
- mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
-
- return now > when;
-}
-
-/* This function is called when we are going to perform some operation
- * in a given key, but such key may be already logically expired even if
- * it still exists in the database. The main way this function is called
- * is via lookupKey*() family of functions.
- *
- * The behavior of the function depends on the replication role of the
- * instance, because slave instances do not expire keys, they wait
- * for DELs from the master for consistency matters. However even
- * slaves will try to have a coherent return value for the function,
- * so that read commands executed in the slave side will be able to
- * behave like if the key is expired even if still present (because the
- * master has yet to propagate the DEL).
- *
- * In masters as a side effect of finding a key which is expired, such
- * key will be evicted from the database. Also this may trigger the
- * propagation of a DEL/UNLINK command in AOF / replication stream.
- *
- * The return value of the function is 0 if the key is still valid,
- * otherwise the function returns 1 if the key is expired. */
-int expireIfNeeded(redisDb *db, robj *key) {
- if (!keyIsExpired(db,key)) return 0;
-
- /* If we are running in the context of a slave, instead of
- * evicting the expired key from the database, we return ASAP:
- * the slave key expiration is controlled by the master that will
- * send us synthesized DEL operations for expired keys.
- *
- * Still we try to return the right information to the caller,
- * that is, 0 if we think the key should be still valid, 1 if
- * we think the key is expired at this time. */
- if (server.masterhost != NULL) return 1;
-
- /* Delete the key */
- server.stat_expiredkeys++;
- propagateExpire(db,key,server.lazyfree_lazy_expire);
- notifyKeyspaceEvent(NOTIFY_EXPIRED,
- "expired",key,db->id);
- return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
- dbSyncDelete(db,key);
-}
-
/* -----------------------------------------------------------------------------
* API to get key arguments from commands
* ---------------------------------------------------------------------------*/
diff --git a/src/debug.c b/src/debug.c
index 0c6b5630c..e4dc12f22 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -118,10 +118,10 @@ void mixStringObjectDigest(unsigned char *digest, robj *o) {
* Note that this function does not reset the initial 'digest' passed, it
* will continue mixing this object digest to anything that was already
* present. */
-void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) {
+void xorObjectDigest(rkey *key, unsigned char *digest, robj *o) {
uint32_t aux = htonl(o->type);
mixDigest(digest,&aux,sizeof(aux));
- long long expiretime = getExpire(db,keyobj);
+ long long expiretime = getExpire(key);
char buf[128];
/* Save the key and associated value */
@@ -277,21 +277,19 @@ void computeDatasetDigest(unsigned char *final) {
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
- sds key;
- robj *keyobj, *o;
+ rkey *key;
+ robj *o;
memset(digest,0,20); /* This key-val digest */
key = dictGetKey(de);
- keyobj = createStringObject(key,sdslen(key));
- mixDigest(digest,key,sdslen(key));
+ mixDigest(digest,key->name,key->len);
o = dictGetVal(de);
- xorObjectDigest(db,keyobj,digest,o);
+ xorObjectDigest(key,digest,o);
/* We can finally xor the key-val digest to the final digest */
xorDigest(final,digest,20);
- decrRefCount(keyobj);
}
dictReleaseIterator(di);
}
@@ -385,6 +383,7 @@ NULL
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
dictEntry *de;
+ rkey *key;
robj *val;
char *strenc;
@@ -392,6 +391,7 @@ NULL
addReply(c,shared.nokeyerr);
return;
}
+ key = dictGetKey(de);
val = dictGetVal(de);
strenc = strEncoding(val->encoding);
@@ -434,7 +434,7 @@ NULL
"lru:%d lru_seconds_idle:%llu%s",
(void*)val, val->refcount,
strenc, rdbSavedObjectLen(val),
- val->lru, estimateObjectIdleTime(val)/1000, extra);
+ key->lru, estimateObjectIdleTime(key)/1000, extra);
} else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) {
dictEntry *de;
robj *val;
@@ -463,7 +463,7 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"ziplist") && c->argc == 3) {
robj *o;
- if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr))
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],NULL,shared.nokeyerr))
== NULL) return;
if (o->encoding != OBJ_ENCODING_ZIPLIST) {
@@ -489,7 +489,7 @@ NULL
if (c->argc == 5)
if (getLongFromObjectOrReply(c, c->argv[4], &valsize, NULL) != C_OK)
return;
- if (lookupKeyWrite(c->db,key) != NULL) {
+ if (lookupKeyWrite(c->db,key,NULL) != NULL) {
decrRefCount(key);
continue;
}
@@ -521,8 +521,10 @@ NULL
for (int j = 2; j < c->argc; j++) {
unsigned char digest[20];
memset(digest,0,20); /* Start with a clean result */
- robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH);
- if (o) xorObjectDigest(c->db,c->argv[j],digest,o);
+ rkey *key;
+ robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],&key,
+ LOOKUP_NOTOUCH);
+ if (o) xorObjectDigest(key,digest,o);
sds d = sdsempty();
for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]);
@@ -634,16 +636,12 @@ NULL
dictGetStats(buf,sizeof(buf),server.db[dbid].dict);
stats = sdscat(stats,buf);
- stats = sdscatprintf(stats,"[Expires HT]\n");
- dictGetStats(buf,sizeof(buf),server.db[dbid].expires);
- stats = sdscat(stats,buf);
-
addReplyBulkSds(c,stats);
} else if (!strcasecmp(c->argv[1]->ptr,"htstats-key") && c->argc == 3) {
robj *o;
dict *ht = NULL;
- if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr))
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],NULL,shared.nokeyerr))
== NULL) return;
/* Get the hash table reference from the object, if possible. */
diff --git a/src/defrag.c b/src/defrag.c
index ecf0255dc..c578dad7c 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -775,7 +775,7 @@ long defragKey(redisDb *db, dictEntry *de) {
/* Dirty code:
* I can't search in db->expires for that key after i already released
* the pointer it holds it won't be able to do the string compare */
- uint64_t hash = dictGetHash(db->dict, de->key);
+ uint64_t hash = dictGetEntryHash(db->dict, de);
replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
}
diff --git a/src/dict.c b/src/dict.c
index 106467ef7..15821213a 100644
--- a/src/dict.c
+++ b/src/dict.c
@@ -206,7 +206,7 @@ int dictRehash(dict *d, int n) {
nextde = de->next;
/* Get the index in the new hash table */
- h = dictHashKey(d, de->key) & d->ht[1].sizemask;
+ h = dictHashStoredKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
@@ -299,7 +299,7 @@ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
/* Get the index of the new element, or -1 if
* the element already exists. */
- if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
+ if ((index = _dictKeyIndex(d,key,dictHashStoredKey(d,key),existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
@@ -369,14 +369,14 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
- h = dictHashKey(d, key);
+ h = dictHashLookupKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
prevHe = NULL;
while(he) {
- if (key==he->key || dictCompareKeys(d, key, he->key)) {
+ if (key==he->key || dictCompareLookupKeys(d, key, he->key)) {
/* Unlink the element from the list */
if (prevHe)
prevHe->next = he->next;
@@ -480,12 +480,12 @@ dictEntry *dictFind(dict *d, const void *key)
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
- h = dictHashKey(d, key);
+ h = dictHashLookupKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
- if (key==he->key || dictCompareKeys(d, key, he->key))
+ if (key==he->key || dictCompareLookupKeys(d, key, he->key))
return he;
he = he->next;
}
@@ -998,7 +998,7 @@ static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **e
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
while(he) {
- if (key==he->key || dictCompareKeys(d, key, he->key)) {
+ if (key==he->key || dictCompareStoredKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
@@ -1024,15 +1024,28 @@ void dictDisableResize(void) {
dict_can_resize = 0;
}
+/* Compute the hash of the specified key (using the lookup hash function)
+ * and returns it to the caller. This is useful in order to later call
+ * dictFindEntryRefByPtrAndHash(). */
uint64_t dictGetHash(dict *d, const void *key) {
- return dictHashKey(d, key);
+ return dictHashLookupKey(d, key);
+}
+
+/* Compute the hash of the specified dict entry (using the stored keys hash
+ * function) and returns it to the caller. This is useful in order to later
+ * call dictFindEntryRefByPtrAndHash(). */
+uint64_t dictGetEntryHash(dict *d, const dictEntry *de) {
+ return dictHashStoredKey(d, de->key);
}
/* Finds the dictEntry reference by using pointer and pre-calculated hash.
* oldkey is a dead pointer and should not be accessed.
- * the hash value should be provided using dictGetHash.
- * no string / key comparison is performed.
- * return value is the reference to the dictEntry if found, or NULL if not found. */
+ * the hash value should be provided using dictGetHash() or dictGetEntryHash()
+ * depending on the object type (if the one we lookup the hash table with, or
+ * the one stored in the dict entry).
+ * No string / key comparison is performed.
+ * Return value is the reference to the dictEntry if found, or NULL if not
+ * found. */
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) {
dictEntry *he, **heref;
unsigned long idx, table;
@@ -1160,9 +1173,11 @@ void freeCallback(void *privdata, void *val) {
dictType BenchmarkDictType = {
hashCallback,
+ hashCallback,
NULL,
NULL,
compareCallback,
+ compareCallback,
freeCallback,
NULL
};
diff --git a/src/dict.h b/src/dict.h
index dec60f637..251062a53 100644
--- a/src/dict.h
+++ b/src/dict.h
@@ -44,6 +44,8 @@
/* Unused arguments generate annoying warnings... */
#define DICT_NOTUSED(V) ((void) V)
+/* An entry (a key basically) as stored inside the hash table. Note that
+ * there is a 'next' pointer, since we use chaining to resolve conflicts. */
typedef struct dictEntry {
void *key;
union {
@@ -55,11 +57,33 @@ typedef struct dictEntry {
struct dictEntry *next;
} dictEntry;
+/* The hash table type.
+ *
+ * There are two hash functions both for key hashing and comparison, one set is
+ * used for looking up the key and the other set to store it.
+ *
+ * Most of the times each set will be the same function pointer, but sometimes
+ * we want the ability to store a key in a given way inside the hash function,
+ * and lookup it in some other way without resorting to any kind of conversion.
+ * For instance the key may be stored as a structure also representing other
+ * things, but the lookup happens via just a pointer to a null terminated
+ * string. The dual hash design allows for such usage. In that case we'll have
+ * a lookupHashFunction that will expect a null terminated C string, and a
+ * storeHashFunction that will instead expect the structure.
+ * Similarly the two comparison functions will work differently. The
+ * lookupKeyCompare will treat the first argument as a pointer to a C string
+ * and the other as a structure (this way we can directly lookup the structure
+ * key using the C string). While the storedKeyCompare() will check if two
+ * pointers to the key in structure form are the same.
+ *
+ * Every actual hash table have a pointer to its type. */
typedef struct dictType {
- uint64_t (*hashFunction)(const void *key);
+ uint64_t (*lookupHashFunction)(const void *key);
+ uint64_t (*storedHashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
- int (*keyCompare)(void *privdata, const void *key1, const void *key2);
+ int (*lookupKeyCompare)(void *privdata, const void *key1, const void *key2);
+ int (*storedKeyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
@@ -132,12 +156,18 @@ typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref);
(entry)->key = (_key_); \
} while(0)
-#define dictCompareKeys(d, key1, key2) \
- (((d)->type->keyCompare) ? \
- (d)->type->keyCompare((d)->privdata, key1, key2) : \
+#define dictCompareLookupKeys(d, key1, key2) \
+ (((d)->type->lookupKeyCompare) ? \
+ (d)->type->lookupKeyCompare((d)->privdata, key1, key2) : \
+ (key1) == (key2))
+
+#define dictCompareStoredKeys(d, key1, key2) \
+ (((d)->type->storedKeyCompare) ? \
+ (d)->type->storedKeyCompare((d)->privdata, key1, key2) : \
(key1) == (key2))
-#define dictHashKey(d, key) (d)->type->hashFunction(key)
+#define dictHashLookupKey(d, key) (d)->type->lookupHashFunction(key)
+#define dictHashStoredKey(d, key) (d)->type->storedHashFunction(key)
#define dictGetKey(he) ((he)->key)
#define dictGetVal(he) ((he)->v.val)
#define dictGetSignedIntegerVal(he) ((he)->v.s64)
@@ -180,6 +210,7 @@ void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
+uint64_t dictGetEntryHash(dict *d, const dictEntry *de);
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
/* Hash table types */
diff --git a/src/evict.c b/src/evict.c
index 176f4c362..22f278645 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -87,12 +87,12 @@ unsigned int LRU_CLOCK(void) {
/* Given an object returns the min number of milliseconds the object was never
* requested, using an approximated LRU algorithm. */
-unsigned long long estimateObjectIdleTime(robj *o) {
+unsigned long long estimateObjectIdleTime(rkey *key) {
unsigned long long lruclock = LRU_CLOCK();
- if (lruclock >= o->lru) {
- return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
+ if (lruclock >= key->lru) {
+ return (lruclock - key->lru) * LRU_CLOCK_RESOLUTION;
} else {
- return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
+ return (lruclock + (LRU_CLOCK_MAX - key->lru)) *
LRU_CLOCK_RESOLUTION;
}
}
@@ -166,7 +166,7 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
unsigned long long idle;
- sds key;
+ rkey *key;
robj *o;
dictEntry *de;
@@ -185,7 +185,7 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
* idle just because the code initially handled LRU, but is in fact
* just a score where an higher score means better candidate. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
- idle = estimateObjectIdleTime(o);
+ idle = estimateObjectIdleTime(key);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
/* When we use an LRU policy, we sort the keys by idle time
* so that we expire keys starting from greater idle time.
@@ -194,10 +194,10 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
* first. So inside the pool we put objects using the inverted
* frequency subtracting the actual frequency to the maximum
* frequency of 255. */
- idle = 255-LFUDecrAndReturn(o);
+ idle = 255-LFUDecrAndReturn(key);
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
- idle = ULLONG_MAX - (long)dictGetVal(de);
+ idle = key->expire;
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
@@ -243,9 +243,9 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
* because allocating and deallocating this object is costly
* (according to the profiler, not my fantasy. Remember:
* premature optimizbla bla bla bla. */
- int klen = sdslen(key);
+ int klen = key->len;
if (klen > EVPOOL_CACHED_SDS_SIZE) {
- pool[k].key = sdsdup(key);
+ pool[k].key = sdsnewlen(key->name,key->len);
} else {
memcpy(pool[k].cached,key,klen+1);
sdssetlen(pool[k].cached,klen);
@@ -332,9 +332,9 @@ uint8_t LFULogIncr(uint8_t counter) {
* This function is used in order to scan the dataset for the best object
* to fit: as we check for the candidate, we incrementally decrement the
* counter of the scanned objects if needed. */
-unsigned long LFUDecrAndReturn(robj *o) {
- unsigned long ldt = o->lru >> 8;
- unsigned long counter = o->lru & 255;
+unsigned long LFUDecrAndReturn(rkey *k) {
+ unsigned long ldt = k->lru >> 8;
+ unsigned long counter = k->lru & 255;
unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
counter = (num_periods > counter) ? 0 : counter - num_periods;
@@ -488,8 +488,13 @@ int freeMemoryIfNeeded(void) {
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
+ #if 0
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
+ #else
+ dict = db->dict;
+ #warning "Implement the volatile policy correctly"
+ #endif
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
@@ -506,8 +511,11 @@ int freeMemoryIfNeeded(void) {
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else {
+ #warning "Implement the volatile policy correctly"
+ #if 0
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
+ #endif
}
/* Remove the entry from the pool. */
@@ -538,8 +546,13 @@ int freeMemoryIfNeeded(void) {
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db+j;
+ #if 0
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires;
+ #else
+ #warning "Implement the volatile policy correctly"
+ dict = db->dict;
+ #endif
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
diff --git a/src/expire.c b/src/expire.c
index 0b92ee3fe..87c87bccd 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -38,9 +38,103 @@
* When keys are accessed they are expired on-access. However we need a
* mechanism in order to ensure keys are eventually removed when expired even
* if no access is performed on them.
+ *
+ * In order to accomplish this every key with an expire is represented in
+ * two data structures:
+ *
+ * 1. The main dictionary of keys, server.db[x]->dict, is an hash table that
+ * represents the keyspace of a given Redis database. The keys stored
+ * in the hash table are redisKey structures (typedef 'rkey'). When
+ * a key has an expire set, the key->flags have the KEY_FLAG_EXPIRE set,
+ * and the key->expire is populated with the milliseconds unix time at
+ * which the key will no longer be valid.
+ *
+ * 2. Redis also takes a radix tree that is composed only of keys that have
+ * an expire set, lexicographically sorted by the expire time. Basically
+ * each key in the radix tree is composed as follows:
+ *
+ * [8 bytes expire unix time][8 bytes key object pointer]
+ *
+ * Such tree is stored in server.db[x]->expire.
+ *
+ * The first field, the unix time, is the same stored in the key->expire of
+ * the corresponding key in the hash table, however it is stored in big endian
+ * so that sorting the time lexicographically in the tree, will also make the
+ * tree sorted by numerical expire time (from the smallest unix time to the
+ * greatest one).
+ *
+ * Then we store the key pointer, this time in native endianess, because how
+ * it is sorted does not matter, being after the unix time. If Redis is running
+ * as a 32 bit system, the last 4 bytes of the pointer are just zeroes, so
+ * we can assume a 16 bytes key in every architecture. Note that from the
+ * pointer we can retrieve the key name, lookup it in the main dictionary, and
+ * delete the key.
+ *
+ * On the other hand, when we modify the expire time of some key, we need to
+ * update the tree accordingly. At every expire cycle, what we need to do is
+ * conceptually very simple: we run the tree and expire keys as long as we
+ * find keys that are already logically expired (expire time > current time).
+ *
*----------------------------------------------------------------------------*/
-/* Helper function for the activeExpireCycle() function.
+#define EXPIRE_KEY_LEN 16 /* Key length in the radix tree of expires. */
+
+/* Populate the buffer 'buf', that should be at least EXPIRE_KEY_LEN bytes,
+ * with the key to store such key in the expires radix tree. See the comment
+ * above to see the format. */
+void encodeExpireKey(unsigned char *buf, rkey *key) {
+ uint64_t expire = htonu64(key->expire);
+ uint64_t ptr = (uint64_t) key; /* The pointer may be 32 bit, cast to 64. */
+ memcpy(buf,&expire,sizeof(expire));
+ memcpy(buf+8,&ptr,sizeof(ptr));
+}
+
+/* This is the reverse of encodeExpireKey(): given the key will return a
+ * pointer to an rkey and the expire value. */
+void decodeExpireKey(unsigned char *buf, uint64_t *expireptr, rkey **keyptrptr) {
+ uint64_t expire;
+ uint64_t keyptr;
+ memcpy(&expire,buf,sizeof(expire));
+ expire = ntohu64(expire);
+ memcpy(&keyptr,buf+8,sizeof(keyptr));
+ *expireptr = expire;
+ *keyptrptr = (rkey*)(unsigned long)keyptr;
+}
+
+/* Populate the expires radix tree with the specified key. */
+void addExpireToTree(redisDb *db, rkey *key) {
+ unsigned char expirekey[EXPIRE_KEY_LEN];
+ encodeExpireKey(expirekey,key);
+ int retval = raxTryInsert(db->expires,expirekey,EXPIRE_KEY_LEN,NULL,NULL);
+ serverAssert(retval != 0);
+}
+
+/* Remove the specified key from the expires radix tree. */
+void removeExpireFromTree(redisDb *db, rkey *key) {
+ unsigned char expirekey[EXPIRE_KEY_LEN];
+ encodeExpireKey(expirekey,key);
+ int retval = raxRemove(db->expires,expirekey,EXPIRE_KEY_LEN,NULL);
+ serverAssert(retval != 0);
+}
+
+/* Delete a key that is found expired by the expiration cycle. We need to
+ * propagate the key too, send the notification event, and take a few
+ * stats. */
+void deleteExpiredKey(redisDb *db, rkey *key) {
+ robj *keyname = createStringObject(key->name,key->len);
+
+ propagateExpire(db,keyname,server.lazyfree_lazy_expire);
+ if (server.lazyfree_lazy_expire)
+ dbAsyncDelete(db,keyname);
+ else
+ dbSyncDelete(db,keyname);
+ notifyKeyspaceEvent(NOTIFY_EXPIRED,
+ "expired",keyname,db->id);
+ decrRefCount(keyname);
+ server.stat_expiredkeys++;
+}
+
+/* Helper function for the expireSlaveKeys() function.
* This function will try to expire the key that is stored in the hash table
* entry 'de' of the 'expires' hash table of a Redis database.
*
@@ -51,21 +145,10 @@
*
* The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */
-int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
- long long t = dictGetSignedIntegerVal(de);
+int activeExpireCycleTryExpire(redisDb *db, rkey *key, long long now) {
+ long long t = key->expire;
if (now > t) {
- sds key = dictGetKey(de);
- robj *keyobj = createStringObject(key,sdslen(key));
-
- propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
- if (server.lazyfree_lazy_expire)
- dbAsyncDelete(db,keyobj);
- else
- dbSyncDelete(db,keyobj);
- notifyKeyspaceEvent(NOTIFY_EXPIRED,
- "expired",keyobj,db->id);
- decrRefCount(keyobj);
- server.stat_expiredkeys++;
+ deleteExpiredKey(db,key);
return 1;
} else {
return 0;
@@ -101,7 +184,8 @@ void activeExpireCycle(int type) {
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
- int j, iteration = 0;
+ int j;
+ unsigned long iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
long long start = ustime(), timelimit, elapsed;
@@ -129,25 +213,20 @@ void activeExpireCycle(int type) {
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
- /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
- * per iteration. Since this function gets called with a frequency of
- * server.hz times per second, the following is the max amount of
- * microseconds we can spend in this function. */
+ /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of
+ * CPU time per iteration. Since this function gets called with a
+ * frequency of server.hz times per second, the following is the max
+ * amount of microseconds we can spend in this function. */
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
+ /* If it's a fast cycle, override the time limit with our fixed
+ * time limit (defaults to 1 millisecond). */
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
- /* Accumulate some global stats as we expire keys, to have some idea
- * about the number of keys that are already logically expired, but still
- * existing inside the database. */
- long total_sampled = 0;
- long total_expired = 0;
-
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
- int expired;
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
@@ -155,92 +234,59 @@ void activeExpireCycle(int type) {
* distribute the time evenly across DBs. */
current_db++;
- /* Continue to expire if at the end of the cycle more than 25%
- * of the keys were expired. */
- do {
- unsigned long num, slots;
- long long now, ttl_sum;
- int ttl_samples;
+ /* If there is nothing to expire try next DB ASAP, avoiding the
+ * cost of seeking the radix tree iterator. */
+ if (raxSize(db->expires) == 0) continue;
+
+ /* The main collection cycle. Run the tree and expire keys that
+ * are found to be already logically expired. */
+ long long now = mstime();
+ raxIterator ri;
+ raxStart(&ri,db->expires);
+ raxSeek(&ri,"^",NULL,0);
+
+ /* Enter the loop expiring keys for this database. Inside this
+ * loop there are two stop conditions:
+ *
+ * 1. The time limit.
+ * 2. The loop will exit if in this DB there are no more keys
+ * that are logically expired.
+ *
+ * Moreover the loop naturally terminates when there are no longer
+ * elements in the radix tree. */
+ while(raxNext(&ri)) {
+ rkey *key;
+ uint64_t expire;
+ decodeExpireKey(ri.key,&expire,&key);
+
+ /* First stop condition: no keys to expire here. */
+ if (expire >= (uint64_t)now) break;
+
+ printf("DEL %.*s -> %llu\n", (int)key->len, key->name, expire);
+ deleteExpiredKey(db,key);
+
+ /* Second stop condition: the time limit. */
iteration++;
-
- /* If there is nothing to expire try next DB ASAP. */
- if ((num = dictSize(db->expires)) == 0) {
- db->avg_ttl = 0;
- break;
- }
- slots = dictSlots(db->expires);
- now = mstime();
-
- /* When there are less than 1% filled slots getting random
- * keys is expensive, so stop here waiting for better times...
- * The dictionary will be resized asap. */
- if (num && slots > DICT_HT_INITIAL_SIZE &&
- (num*100/slots < 1)) break;
-
- /* The main collection cycle. Sample random keys among keys
- * with an expire set, checking for expired ones. */
- expired = 0;
- ttl_sum = 0;
- ttl_samples = 0;
-
- if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
- num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
-
- while (num--) {
- dictEntry *de;
- long long ttl;
-
- if ((de = dictGetRandomKey(db->expires)) == NULL) break;
- ttl = dictGetSignedIntegerVal(de)-now;
- if (activeExpireCycleTryExpire(db,de,now)) expired++;
- if (ttl > 0) {
- /* We want the average TTL of keys yet not expired. */
- ttl_sum += ttl;
- ttl_samples++;
- }
- total_sampled++;
- }
- total_expired += expired;
-
- /* Update the average TTL stats for this database. */
- if (ttl_samples) {
- long long avg_ttl = ttl_sum/ttl_samples;
-
- /* Do a simple running average with a few samples.
- * We just use the current estimate with a weight of 2%
- * and the previous estimate with a weight of 98%. */
- if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
- db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
- }
-
- /* We can't block forever here even if there are many keys to
- * expire. So after a given amount of milliseconds return to the
- * caller waiting for the other active expire cycle. */
- if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
- elapsed = ustime()-start;
+ if ((iteration & 0xff) == 0) {
+ now = ustime();
+ elapsed = now-start;
+ now /= 1000; /* Convert back now to milliseconds. */
if (elapsed > timelimit) {
timelimit_exit = 1;
+ printf("LIMIT (%llu) type:%d [elapsed=%llu]\n", timelimit, type, elapsed);
server.stat_expired_time_cap_reached_count++;
break;
}
}
- /* We don't repeat the cycle if there are less than 25% of keys
- * found expired in the current DB. */
- } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
+ /* Reseek the iterator: the node we were on is now
+ * deleted. */
+ raxSeek(&ri,"^",NULL,0);
+ }
+ raxStop(&ri);
}
elapsed = ustime()-start;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
-
- /* Update our estimate of keys existing but yet to be expired.
- * Running average with this sample accounting for 5%. */
- double current_perc;
- if (total_sampled) {
- current_perc = (double)total_expired/total_sampled;
- } else
- current_perc = 0;
- server.stat_expired_stale_perc = (current_perc*0.05)+
- (server.stat_expired_stale_perc*0.95);
}
/*-----------------------------------------------------------------------------
@@ -269,8 +315,8 @@ void activeExpireCycle(int type) {
/* The dictionary where we remember key names and database ID of keys we may
* want to expire from the slave. Since this function is not often used we
* don't even care to initialize the database at startup. We'll do it once
- * the feature is used the first time, that is, when rememberSlaveKeyWithExpire()
- * is called.
+ * the feature is used the first time, that is, when the function
+ * rememberSlaveKeyWithExpire() is called.
*
* The dictionary has an SDS string representing the key as the hash table
* key, while the value is a 64 bit unsigned integer with the bits corresponding
@@ -300,11 +346,12 @@ void expireSlaveKeys(void) {
while(dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
redisDb *db = server.db+dbid;
- dictEntry *expire = dictFind(db->expires,keyname);
+ rkey *key = dictFetchValue(db->dict,keyname);
+ if (!(key->flags & KEY_FLAG_EXPIRE)) key = NULL;
int expired = 0;
- if (expire &&
- activeExpireCycleTryExpire(server.db+dbid,expire,start))
+ if (key &&
+ activeExpireCycleTryExpire(server.db+dbid,key,start))
{
expired = 1;
}
@@ -313,7 +360,7 @@ void expireSlaveKeys(void) {
* corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */
- if (expire && !expired) {
+ if (key && !expired) {
noexpire++;
new_dbids |= (uint64_t)1 << dbid;
}
@@ -341,13 +388,15 @@ void expireSlaveKeys(void) {
/* Track keys that received an EXPIRE or similar command in the context
* of a writable slave. */
-void rememberSlaveKeyWithExpire(redisDb *db, robj *key) {
+void rememberSlaveKeyWithExpire(redisDb *db, rkey *key) {
if (slaveKeysWithExpire == NULL) {
static dictType dt = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
+ dictSdsKeyCompare, /* loopkup key compare */
+ dictSdsKeyCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
@@ -355,13 +404,15 @@ void rememberSlaveKeyWithExpire(redisDb *db, robj *key) {
}
if (db->id > 63) return;
- dictEntry *de = dictAddOrFind(slaveKeysWithExpire,key->ptr);
- /* If the entry was just created, set it to a copy of the SDS string
- * representing the key: we don't want to need to take those keys
- * in sync with the main DB. The keys will be removed by expireSlaveKeys()
- * as it scans to find keys to remove. */
- if (de->key == key->ptr) {
- de->key = sdsdup(key->ptr);
+ sds skey = sdsnewlen(key->name,key->len);
+ dictEntry *de = dictAddOrFind(slaveKeysWithExpire,skey);
+ /* If the entry was already there, free the SDS string we used to lookup.
+ * Note that we don't care to take those keys in sync with the
+ * main DB. The keys will be removed by expireSlaveKeys() as it scans to
+ * find keys to remove. */
+ if (de->key != skey) {
+ sdsfree(skey);
+ } else {
dictSetUnsignedIntegerVal(de,0);
}
@@ -392,6 +443,137 @@ void flushSlaveKeysWithExpireList(void) {
}
/*-----------------------------------------------------------------------------
+ * Expires API
+ *----------------------------------------------------------------------------*/
+
+/* Remove the expire from the key making it persistent. */
+int removeExpire(redisDb *db, rkey *key) {
+ if (!(key->flags & KEY_FLAG_EXPIRE)) return 0;
+ removeExpireFromTree(db,key);
+ key->flags &= ~KEY_FLAG_EXPIRE;
+ key->expire = 0; /* Not needed but better to leave the object clean. */
+ return 1;
+}
+
+/* Set an expire to the specified key. If the expire is set in the context
+ * of an user calling a command 'c' is the client, otherwise 'c' is set
+ * to NULL. The 'when' parameter is the absolute unix time in milliseconds
+ * after which the key will no longer be considered valid. */
+void setExpire(client *c, redisDb *db, rkey *key, long long when) {
+ /* Reuse the sds from the main dict in the expire dict */
+ if (key->flags & KEY_FLAG_EXPIRE) removeExpireFromTree(db,key);
+ key->flags |= KEY_FLAG_EXPIRE;
+ key->expire = when;
+ addExpireToTree(db,key);
+
+ int writable_slave = server.masterhost && server.repl_slave_ro == 0;
+ if (c && writable_slave && !(c->flags & CLIENT_MASTER))
+ rememberSlaveKeyWithExpire(db,key);
+}
+
+/* Return the expire time of the specified key, or -1 if no expire
+ * is associated with this key (i.e. the key is non volatile) */
+long long getExpire(rkey *key) {
+ return (key->flags & KEY_FLAG_EXPIRE) ? key->expire : -1;
+}
+
+/* Propagate expires into slaves and the AOF file.
+ * When a key expires in the master, a DEL operation for this key is sent
+ * to all the slaves and the AOF file if enabled.
+ *
+ * This way the key expiry is centralized in one place, and since both
+ * AOF and the master->slave link guarantee operation ordering, everything
+ * will be consistent even if we allow write operations against expiring
+ * keys. */
+void propagateExpire(redisDb *db, robj *key, int lazy) {
+ robj *argv[2];
+
+ argv[0] = lazy ? shared.unlink : shared.del;
+ argv[1] = key;
+ incrRefCount(argv[0]);
+ incrRefCount(argv[1]);
+
+ if (server.aof_state != AOF_OFF)
+ feedAppendOnlyFile(server.delCommand,db->id,argv,2);
+ replicationFeedSlaves(server.slaves,db->id,argv,2);
+
+ decrRefCount(argv[0]);
+ decrRefCount(argv[1]);
+}
+
+/* Check if the key is expired. */
+int keyIsExpired(rkey *key) {
+ mstime_t when = getExpire(key);
+
+ if (when < 0) return 0; /* No expire for this key */
+
+ /* Don't expire anything while loading. It will be done later. */
+ if (server.loading) return 0;
+
+ /* If we are in the context of a Lua script, we pretend that time is
+ * blocked to when the Lua script started. This way a key can expire
+ * only the first time it is accessed and not in the middle of the
+ * script execution, making propagation to slaves / AOF consistent.
+ * See issue #1525 on Github for more information. */
+ mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
+
+ return now > when;
+}
+
+/* This function is called when we are going to perform some operation
+ * in a given key, but such key may be already logically expired even if
+ * it still exists in the database. The main way this function is called
+ * is via lookupKey*() family of functions.
+ *
+ * The behavior of the function depends on the replication role of the
+ * instance, because slave instances do not expire keys, they wait
+ * for DELs from the master for consistency matters. However even
+ * slaves will try to have a coherent return value for the function,
+ * so that read commands executed in the slave side will be able to
+ * behave like if the key is expired even if still present (because the
+ * master has yet to propagate the DEL).
+ *
+ * In masters as a side effect of finding a key which is expired, such
+ * key will be evicted from the database. Also this may trigger the
+ * propagation of a DEL/UNLINK command in AOF / replication stream.
+ *
+ * The return value of the function is 0 if the key is still valid,
+ * otherwise the function returns 1 if the key is expired. */
+int expireIfNeeded(redisDb *db, robj *keyname, rkey *key) {
+ if (!keyIsExpired(key)) return 0;
+
+ /* If we are running in the context of a slave, instead of
+ * evicting the expired key from the database, we return ASAP:
+ * the slave key expiration is controlled by the master that will
+ * send us synthesized DEL operations for expired keys.
+ *
+ * Still we try to return the right information to the caller,
+ * that is, 0 if we think the key should be still valid, 1 if
+ * we think the key is expired at this time. */
+ if (server.masterhost != NULL) return 1;
+
+ /* Delete the key */
+ server.stat_expiredkeys++;
+ propagateExpire(db,keyname,server.lazyfree_lazy_expire);
+ notifyKeyspaceEvent(NOTIFY_EXPIRED,
+ "expired",keyname,db->id);
+ return server.lazyfree_lazy_expire ? dbAsyncDelete(db,keyname) :
+ dbSyncDelete(db,keyname);
+}
+
+/* Sometimes we have just the name of the key, because we have still to
+ * lookup it. In such cases this function is more handy compared to
+ * expireIfNeeded(): just a wrapper performing the lookup first. */
+int expireIfNeededByName(redisDb *db, robj *keyname) {
+ rkey *key;
+ robj *val = lookupKey(db,keyname,&key,LOOKUP_NOTOUCH);
+ if (!val) return 0;
+ return expireIfNeeded(db,keyname,key);
+}
+
+
+
+/*-----------------------------------------------------------------------------
* Expires Commands
*----------------------------------------------------------------------------*/
@@ -403,7 +585,8 @@ void flushSlaveKeysWithExpireList(void) {
* unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
* the argv[2] parameter. The basetime is always specified in milliseconds. */
void expireGenericCommand(client *c, long long basetime, int unit) {
- robj *key = c->argv[1], *param = c->argv[2];
+ robj *keyname = c->argv[1], *param = c->argv[2];
+ rkey *key;
long long when; /* unix time in milliseconds when the key will expire. */
if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
@@ -413,7 +596,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
when += basetime;
/* No key, return zero. */
- if (lookupKeyWrite(c->db,key) == NULL) {
+ if (lookupKeyWrite(c->db,keyname,&key) == NULL) {
addReply(c,shared.czero);
return;
}
@@ -427,23 +610,24 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
if (when <= mstime() && !server.loading && !server.masterhost) {
robj *aux;
- int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
- dbSyncDelete(c->db,key);
- serverAssertWithInfo(c,key,deleted);
+ int deleted = server.lazyfree_lazy_expire ?
+ dbAsyncDelete(c->db,keyname) :
+ dbSyncDelete(c->db,keyname);
+ serverAssertWithInfo(c,keyname,deleted);
server.dirty++;
/* Replicate/AOF this as an explicit DEL or UNLINK. */
aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
- rewriteClientCommandVector(c,2,aux,key);
- signalModifiedKey(c->db,key);
- notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
+ rewriteClientCommandVector(c,2,aux,keyname);
+ signalModifiedKey(c->db,keyname);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"del",keyname,c->db->id);
addReply(c, shared.cone);
return;
} else {
setExpire(c,c->db,key,when);
addReply(c,shared.cone);
- signalModifiedKey(c->db,key);
- notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
+ signalModifiedKey(c->db,keyname);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",keyname,c->db->id);
server.dirty++;
return;
}
@@ -472,15 +656,16 @@ void pexpireatCommand(client *c) {
/* Implements TTL and PTTL */
void ttlGenericCommand(client *c, int output_ms) {
long long expire, ttl = -1;
+ rkey *key;
/* If the key does not exist at all, return -2 */
- if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == NULL) {
+ if (lookupKeyReadWithFlags(c->db,c->argv[1],&key,LOOKUP_NOTOUCH) == NULL) {
addReplyLongLong(c,-2);
return;
}
/* The key exists. Return -1 if it has no expire, or the actual
* TTL value otherwise. */
- expire = getExpire(c->db,c->argv[1]);
+ expire = getExpire(key);
if (expire != -1) {
ttl = expire-mstime();
if (ttl < 0) ttl = 0;
@@ -504,8 +689,9 @@ void pttlCommand(client *c) {
/* PERSIST key */
void persistCommand(client *c) {
- if (lookupKeyWrite(c->db,c->argv[1])) {
- if (removeExpire(c->db,c->argv[1])) {
+ rkey *key;
+ if (lookupKeyWrite(c->db,c->argv[1],&key)) {
+ if (removeExpire(c->db,key)) {
addReply(c,shared.cone);
server.dirty++;
} else {
@@ -520,7 +706,7 @@ void persistCommand(client *c) {
void touchCommand(client *c) {
int touched = 0;
for (int j = 1; j < c->argc; j++)
- if (lookupKeyRead(c->db,c->argv[j]) != NULL) touched++;
+ if (lookupKeyRead(c->db,c->argv[j],NULL) != NULL) touched++;
addReplyLongLong(c,touched);
}
diff --git a/src/geo.c b/src/geo.c
index 826d11ff5..54d9c6d0f 100644
--- a/src/geo.c
+++ b/src/geo.c
@@ -466,8 +466,8 @@ void georadiusGeneric(client *c, int flags) {
/* Look up the requested zset */
robj *zobj = NULL;
- if ((zobj = lookupKeyReadOrReply(c, key, shared.null[c->resp])) == NULL ||
- checkType(c, zobj, OBJ_ZSET)) {
+ if ((zobj = lookupKeyReadOrReply(c, key, NULL, shared.null[c->resp]))
+ == NULL || checkType(c, zobj, OBJ_ZSET)) {
return;
}
@@ -701,7 +701,7 @@ void geohashCommand(client *c) {
int j;
/* Look up the requested zset */
- robj *zobj = lookupKeyRead(c->db, c->argv[1]);
+ robj *zobj = lookupKeyRead(c->db, c->argv[1], NULL);
if (zobj && checkType(c, zobj, OBJ_ZSET)) return;
/* Geohash elements one after the other, using a null bulk reply for
@@ -754,7 +754,7 @@ void geoposCommand(client *c) {
int j;
/* Look up the requested zset */
- robj *zobj = lookupKeyRead(c->db, c->argv[1]);
+ robj *zobj = lookupKeyRead(c->db, c->argv[1], NULL);
if (zobj && checkType(c, zobj, OBJ_ZSET)) return;
/* Report elements one after the other, using a null bulk reply for
@@ -797,7 +797,7 @@ void geodistCommand(client *c) {
/* Look up the requested zset */
robj *zobj = NULL;
- if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.null[c->resp]))
+ if ((zobj = lookupKeyReadOrReply(c, c->argv[1], NULL, shared.null[c->resp]))
== NULL || checkType(c, zobj, OBJ_ZSET)) return;
/* Get the scores. We need both otherwise NULL is returned. */
diff --git a/src/gopher.c b/src/gopher.c
index 38e44f754..6e4485cb3 100644
--- a/src/gopher.c
+++ b/src/gopher.c
@@ -50,7 +50,7 @@ void addReplyGopherItem(client *c, const char *type, const char *descr,
* protocol. */
void processGopherRequest(client *c) {
robj *keyname = c->argc == 0 ? createStringObject("/",1) : c->argv[0];
- robj *o = lookupKeyRead(c->db,keyname);
+ robj *o = lookupKeyRead(c->db,keyname,NULL);
/* If there is no such key, return with a Gopher error. */
if (o == NULL || o->type != OBJ_STRING) {
diff --git a/src/hyperloglog.c b/src/hyperloglog.c
index e01ea6042..161ae9457 100644
--- a/src/hyperloglog.c
+++ b/src/hyperloglog.c
@@ -1179,7 +1179,7 @@ invalid:
/* PFADD var ele ele ele ... ele => :0 or :1 */
void pfaddCommand(client *c) {
- robj *o = lookupKeyWrite(c->db,c->argv[1]);
+ robj *o = lookupKeyWrite(c->db,c->argv[1],NULL);
struct hllhdr *hdr;
int updated = 0, j;
@@ -1238,7 +1238,7 @@ void pfcountCommand(client *c) {
registers = max + HLL_HDR_SIZE;
for (j = 1; j < c->argc; j++) {
/* Check type and size. */
- robj *o = lookupKeyRead(c->db,c->argv[j]);
+ robj *o = lookupKeyRead(c->db,c->argv[j],NULL);
if (o == NULL) continue; /* Assume empty HLL for non existing var.*/
if (isHLLObjectOrReply(c,o) != C_OK) return;
@@ -1259,7 +1259,7 @@ void pfcountCommand(client *c) {
*
* The user specified a single key. Either return the cached value
* or compute one and update the cache. */
- o = lookupKeyWrite(c->db,c->argv[1]);
+ o = lookupKeyWrite(c->db,c->argv[1],NULL);
if (o == NULL) {
/* No key? Cardinality is zero since no element was added, otherwise
* we would have a key as HLLADD creates it as a side effect. */
@@ -1320,7 +1320,7 @@ void pfmergeCommand(client *c) {
memset(max,0,sizeof(max));
for (j = 1; j < c->argc; j++) {
/* Check type and size. */
- robj *o = lookupKeyRead(c->db,c->argv[j]);
+ robj *o = lookupKeyRead(c->db,c->argv[j],NULL);
if (o == NULL) continue; /* Assume empty HLL for non existing var. */
if (isHLLObjectOrReply(c,o) != C_OK) return;
@@ -1338,7 +1338,7 @@ void pfmergeCommand(client *c) {
}
/* Create / unshare the destination key's value if needed. */
- robj *o = lookupKeyWrite(c->db,c->argv[1]);
+ robj *o = lookupKeyWrite(c->db,c->argv[1],NULL);
if (o == NULL) {
/* Create the key with a string value of the exact length to
* hold our HLL data structure. sdsnewlen() when NULL is passed
@@ -1497,7 +1497,7 @@ void pfdebugCommand(client *c) {
robj *o;
int j;
- o = lookupKeyWrite(c->db,c->argv[2]);
+ o = lookupKeyWrite(c->db,c->argv[2],NULL);
if (o == NULL) {
addReplyError(c,"The specified key does not exist");
return;
diff --git a/src/latency.c b/src/latency.c
index 33aa1245b..072b24ca6 100644
--- a/src/latency.c
+++ b/src/latency.c
@@ -48,10 +48,12 @@ uint64_t dictStringHash(const void *key) {
void dictVanillaFree(void *privdata, void *val);
dictType latencyTimeSeriesDictType = {
- dictStringHash, /* hash function */
+ dictStringHash, /* lookup hash function */
+ dictStringHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictStringKeyCompare, /* key compare */
+ dictStringKeyCompare, /* lookup key compare */
+ dictStringKeyCompare, /* stored key compare */
dictVanillaFree, /* key destructor */
dictVanillaFree /* val destructor */
};
diff --git a/src/lazyfree.c b/src/lazyfree.c
index 3d3159c90..2e0bdfd42 100644
--- a/src/lazyfree.c
+++ b/src/lazyfree.c
@@ -51,19 +51,19 @@ size_t lazyfreeGetFreeEffort(robj *obj) {
* a lazy free list instead of being freed synchronously. The lazy free list
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
-int dbAsyncDelete(redisDb *db, robj *key) {
- /* Deleting an entry from the expires dict will not free the sds of
- * the key, because it is shared with the main dictionary. */
- if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
-
+int dbAsyncDelete(redisDb *db, robj *keyname) {
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
- dictEntry *de = dictUnlink(db->dict,key->ptr);
+ dictEntry *de = dictUnlink(db->dict,keyname->ptr);
if (de) {
+ rkey *key = dictGetKey(de);
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
+ /* Remove the entry from the expire tree since this is fast anyway. */
+ if (key->flags & KEY_FLAG_EXPIRE) removeExpireFromTree(db,key);
+
/* If releasing the object is too much work, do it in the background
* by adding the object to the lazy free list.
* Note that if the object is shared, to reclaim it now it is not
@@ -83,7 +83,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
- if (server.cluster_enabled) slotToKeyDel(key);
+ if (server.cluster_enabled) slotToKeyDel(keyname);
return 1;
} else {
return 0;
@@ -105,11 +105,11 @@ void freeObjAsync(robj *o) {
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
- dict *oldht1 = db->dict, *oldht2 = db->expires;
+ void *hashtable = db->dict, *tree = db->expires;
db->dict = dictCreate(&dbDictType,NULL);
- db->expires = dictCreate(&keyptrDictType,NULL);
- atomicIncr(lazyfree_objects,dictSize(oldht1));
- bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
+ db->expires = raxNew();
+ atomicIncr(lazyfree_objects,dictSize((dict*)hashtable));
+ bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,hashtable,tree);
}
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
@@ -136,10 +136,10 @@ void lazyfreeFreeObjectFromBioThread(robj *o) {
* when the database was logically deleted. 'sl' is a skiplist used by
* Redis Cluster in order to take the hash slots -> keys mapping. This
* may be NULL if Redis Cluster is disabled. */
-void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
- size_t numkeys = dictSize(ht1);
- dictRelease(ht1);
- dictRelease(ht2);
+void lazyfreeFreeDatabaseFromBioThread(dict *hashtable, rax *tree) {
+ size_t numkeys = dictSize(hashtable);
+ dictRelease(hashtable);
+ raxFree(tree);
atomicDecr(lazyfree_objects,numkeys);
}
diff --git a/src/module.c b/src/module.c
index 7dee7e776..b5040a67b 100644
--- a/src/module.c
+++ b/src/module.c
@@ -148,7 +148,8 @@ typedef struct RedisModuleCtx RedisModuleCtx;
struct RedisModuleKey {
RedisModuleCtx *ctx;
redisDb *db;
- robj *key; /* Key name object. */
+ rkey *keyobj; /* Key object stored at the dictionary. */
+ robj *keyname; /* Key name object. */
robj *value; /* Value object, or NULL if the key was not found. */
void *iter; /* Iterator. */
int mode; /* Opening mode. */
@@ -436,7 +437,7 @@ int moduleCreateEmptyKey(RedisModuleKey *key, int type) {
break;
default: return REDISMODULE_ERR;
}
- dbAdd(key->db,key->key,obj);
+ key->keyobj = dbAdd(key->db,key->keyname,obj);
key->value = obj;
return REDISMODULE_OK;
}
@@ -465,7 +466,7 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) {
}
if (isempty) {
- dbDelete(key->db,key->key);
+ dbDelete(key->db,key->keyname);
key->value = NULL;
return 1;
} else {
@@ -1519,11 +1520,12 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
RedisModuleKey *kp;
robj *value;
+ rkey *keyobj;
if (mode & REDISMODULE_WRITE) {
- value = lookupKeyWrite(ctx->client->db,keyname);
+ value = lookupKeyWrite(ctx->client->db,keyname,&keyobj);
} else {
- value = lookupKeyRead(ctx->client->db,keyname);
+ value = lookupKeyRead(ctx->client->db,keyname,&keyobj);
if (value == NULL) {
return NULL;
}
@@ -1533,8 +1535,9 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
kp = zmalloc(sizeof(*kp));
kp->ctx = ctx;
kp->db = ctx->client->db;
- kp->key = keyname;
+ kp->keyname = keyname;
incrRefCount(keyname);
+ kp->keyobj = keyobj;
kp->value = value;
kp->iter = NULL;
kp->mode = mode;
@@ -1546,10 +1549,10 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
/* Close a key handle. */
void RM_CloseKey(RedisModuleKey *key) {
if (key == NULL) return;
- if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->key);
+ if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->keyname);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
RM_ZsetRangeStop(key);
- decrRefCount(key->key);
+ decrRefCount(key->keyname);
autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
zfree(key);
}
@@ -1595,13 +1598,13 @@ size_t RM_ValueLength(RedisModuleKey *key) {
int RM_DeleteKey(RedisModuleKey *key) {
if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
if (key->value) {
- dbDelete(key->db,key->key);
+ dbDelete(key->db,key->keyname);
key->value = NULL;
}
return REDISMODULE_OK;
}
-/* If the key is open for writing, unlink it (that is delete it in a
+/* If the key is open for writing, unlink it (that is delete it in a
* non-blocking way, not reclaiming memory immediately) and setup the key to
* accept new writes as an empty key (that will be created on demand).
* On success REDISMODULE_OK is returned. If the key is not open for
@@ -1609,7 +1612,7 @@ int RM_DeleteKey(RedisModuleKey *key) {
int RM_UnlinkKey(RedisModuleKey *key) {
if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
if (key->value) {
- dbAsyncDelete(key->db,key->key);
+ dbAsyncDelete(key->db,key->keyname);
key->value = NULL;
}
return REDISMODULE_OK;
@@ -1619,7 +1622,7 @@ int RM_UnlinkKey(RedisModuleKey *key) {
* If no TTL is associated with the key or if the key is empty,
* REDISMODULE_NO_EXPIRE is returned. */
mstime_t RM_GetExpire(RedisModuleKey *key) {
- mstime_t expire = getExpire(key->db,key->key);
+ mstime_t expire = getExpire(key->keyobj);
if (expire == -1 || key->value == NULL) return -1;
expire -= mstime();
return expire >= 0 ? expire : 0;
@@ -1639,9 +1642,9 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_ERR;
if (expire != REDISMODULE_NO_EXPIRE) {
expire += mstime();
- setExpire(key->ctx->client,key->db,key->key,expire);
+ setExpire(key->ctx->client,key->db,key->keyobj,expire);
} else {
- removeExpire(key->db,key->key);
+ removeExpire(key->db,key->keyobj);
}
return REDISMODULE_OK;
}
@@ -1657,7 +1660,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
RM_DeleteKey(key);
- setKey(key->db,key->key,str);
+ setKey(key->db,key->keyname,str);
key->value = str;
return REDISMODULE_OK;
}
@@ -1707,7 +1710,7 @@ char *RM_StringDMA(RedisModuleKey *key, size_t *len, int mode) {
/* For write access, and even for read access if the object is encoded,
* we unshare the string (that has the side effect of decoding it). */
if ((mode & REDISMODULE_WRITE) || key->value->encoding != OBJ_ENCODING_RAW)
- key->value = dbUnshareStringValue(key->db, key->key, key->value);
+ key->value = dbUnshareStringValue(key->db, key->keyname, key->value);
*len = sdslen(key->value->ptr);
return key->value->ptr;
@@ -1737,12 +1740,12 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
if (key->value == NULL) {
/* Empty key: create it with the new size. */
robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen));
- setKey(key->db,key->key,o);
+ setKey(key->db,key->keyname,o);
key->value = o;
decrRefCount(o);
} else {
/* Unshare and resize. */
- key->value = dbUnshareStringValue(key->db, key->key, key->value);
+ key->value = dbUnshareStringValue(key->db, key->keyname, key->value);
size_t curlen = sdslen(key->value->ptr);
if (newlen > curlen) {
key->value->ptr = sdsgrowzero(key->value->ptr,newlen);
@@ -3088,7 +3091,7 @@ int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
RM_DeleteKey(key);
robj *o = createModuleObject(mt,value);
- setKey(key->db,key->key,o);
+ setKey(key->db,key->keyname,o);
decrRefCount(o);
key->value = o;
return REDISMODULE_OK;
@@ -5031,7 +5034,8 @@ int dictCStringKeyCompare(void *privdata, const void *key1, const void *key2) {
}
dictType moduleAPIDictType = {
- dictCStringKeyHash, /* hash function */
+ dictCStringKeyHash, /* lookup hash function */
+ dictCStringKeyHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
dictCStringKeyCompare, /* key compare */
diff --git a/src/object.c b/src/object.c
index 234e11f8a..4d0eeeca6 100644
--- a/src/object.c
+++ b/src/object.c
@@ -44,14 +44,6 @@ robj *createObject(int type, void *ptr) {
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
-
- /* Set the LRU to the current lruclock (minutes resolution), or
- * alternatively the LFU counter. */
- if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
- o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
- } else {
- o->lru = LRU_CLOCK();
- }
return o;
}
@@ -89,11 +81,6 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
o->encoding = OBJ_ENCODING_EMBSTR;
o->ptr = sh+1;
o->refcount = 1;
- if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
- o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
- } else {
- o->lru = LRU_CLOCK();
- }
sh->len = len;
sh->alloc = len;
@@ -1052,8 +1039,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem;
- mem = dictSize(db->expires) * sizeof(dictEntry) +
- dictSlots(db->expires) * sizeof(dictEntry*);
+ #warning "Fix the memory computation here with expires overhead"
mh->db[mh->num_dbs].overhead_ht_expires = mem;
mem_total+=mem;
@@ -1202,12 +1188,12 @@ sds getMemoryDoctorReport(void) {
* The lru_idle and lru_clock args are only relevant if policy
* is MAXMEMORY_FLAG_LRU.
* Either or both of them may be <0, in that case, nothing is set. */
-void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
+void objectSetLRUOrLFU(rkey *key, long long lfu_freq, long long lru_idle,
long long lru_clock) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
if (lfu_freq >= 0) {
serverAssert(lfu_freq <= 255);
- val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq;
+ key->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq;
}
} else if (lru_idle >= 0) {
/* Provided LRU idle time is in seconds. Scale
@@ -1223,7 +1209,7 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
* some time. */
if (lru_abs < 0)
lru_abs = (lru_clock+(LRU_CLOCK_MAX/2)) % LRU_CLOCK_MAX;
- val->lru = lru_abs;
+ key->lru = lru_abs;
}
}
@@ -1231,15 +1217,16 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
/* This is a helper function for the OBJECT command. We need to lookup keys
* without any modification of LRU or other parameters. */
-robj *objectCommandLookup(client *c, robj *key) {
+robj *objectCommandLookup(client *c, robj *keyname, rkey **key) {
dictEntry *de;
- if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL;
+ if ((de = dictFind(c->db->dict,keyname->ptr)) == NULL) return NULL;
+ if (key) *key = dictGetKey(de);
return (robj*) dictGetVal(de);
}
-robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) {
- robj *o = objectCommandLookup(c,key);
+robj *objectCommandLookupOrReply(client *c, robj *keyname, rkey **key, robj *reply) {
+ robj *o = objectCommandLookup(c,keyname,key);
if (!o) addReply(c, reply);
return o;
@@ -1260,24 +1247,26 @@ NULL
};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
- if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
- == NULL) return;
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],NULL,
+ shared.null[c->resp])) == NULL) return;
addReplyLongLong(c,o->refcount);
} else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
- if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
- == NULL) return;
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],NULL,
+ shared.null[c->resp])) == NULL) return;
addReplyBulkCString(c,strEncoding(o->encoding));
} else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
- if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
- == NULL) return;
+ rkey *key;
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],&key,
+ shared.null[c->resp])) == NULL) return;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
}
- addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
+ addReplyLongLong(c,estimateObjectIdleTime(key)/1000);
} else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) {
- if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
- == NULL) return;
+ rkey *k;
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],&k,
+ shared.null[c->resp])) == NULL) return;
if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
@@ -1286,7 +1275,7 @@ NULL
* in case of the key has not been accessed for a long time,
* because we update the access time only
* when the key is read or overwritten. */
- addReplyLongLong(c,LFUDecrAndReturn(o));
+ addReplyLongLong(c,LFUDecrAndReturn(k));
} else {
addReplySubcommandSyntaxError(c);
}
diff --git a/src/rdb.c b/src/rdb.c
index 95e4766ea..39974a33e 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -475,6 +475,7 @@ ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
* RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc()
* instead of a Redis object with an sds in it.
* RDB_LOAD_SDS: Return an SDS string instead of a Redis object.
+ * RDB_LOAD_KEY: Return a key object instead of a Redis object.
*
* On I/O error NULL is returned.
*/
@@ -751,7 +752,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
/* Save a Redis object.
* Returns -1 on error, number of bytes written on success. */
-ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
+ssize_t rdbSaveObject(rio *rdb, robj *o, rkey *key) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
@@ -966,7 +967,9 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
- moduleInitIOContext(io,mt,rdb,key);
+ robj *keyobj = createStringObject(key->name,key->len);
+ moduleInitIOContext(io,mt,rdb,keyobj);
+ decrRefCount(keyobj);
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
@@ -1005,7 +1008,7 @@ size_t rdbSavedObjectLen(robj *o) {
* On error -1 is returned.
* On success if the key was actually saved 1 is returned, otherwise 0
* is returned (the key was already expired). */
-int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
+int rdbSaveKeyValuePair(rio *rdb, rkey *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
@@ -1017,7 +1020,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
/* Save the LRU info. */
if (savelru) {
- uint64_t idletime = estimateObjectIdleTime(val);
+ uint64_t idletime = estimateObjectIdleTime(key);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
@@ -1026,7 +1029,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
/* Save the LFU info. */
if (savelfu) {
uint8_t buf[1];
- buf[0] = LFUDecrAndReturn(val);
+ buf[0] = LFUDecrAndReturn(key);
/* We can encode this in exactly two bytes: the opcode and an 8
* bit counter, since the frequency is logarithmic with a 0-255 range.
* Note that we do not store the halving time because to reset it
@@ -1037,7 +1040,8 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
- if (rdbSaveStringObject(rdb,key) == -1) return -1;
+ if (rdbSaveRawString(rdb,(unsigned char*)key->name,key->len) == -1)
+ return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
return 1;
}
@@ -1129,20 +1133,18 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
* these sizes are just hints to resize the hash tables. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
- expires_size = dictSize(db->expires);
+ expires_size = raxSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
- sds keystr = dictGetKey(de);
- robj key, *o = dictGetVal(de);
- long long expire;
+ rkey *key = dictGetKey(de);
+ robj *o = dictGetVal(de);
- initStaticStringObject(key,keystr);
- expire = getExpire(db,&key);
- if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
+ if (rdbSaveKeyValuePair(rdb,key,o,getExpire(key)) == -1)
+ goto werr;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
@@ -1928,7 +1930,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
dictExpand(db->dict,db_size);
- dictExpand(db->expires,expires_size);
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
@@ -2034,17 +2035,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
decrRefCount(val);
} else {
/* Add the new object in the hash table */
- dbAdd(db,key,val);
+ rkey *k = dbAdd(db,key,val);
/* Set the expire time if needed */
- if (expiretime != -1) setExpire(NULL,db,key,expiretime);
+ if (expiretime != -1) setExpire(NULL,db,k,expiretime);
/* Set usage information (for eviction). */
- objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
-
- /* Decrement the key refcount since dbAdd() will take its
- * own reference. */
- decrRefCount(key);
+ objectSetLRUOrLFU(k,lfu_freq,lru_idle,lru_clock);
}
/* Reset the state that is key-specified and is populated by
diff --git a/src/rdb.h b/src/rdb.h
index 0acddf9ab..5bd3e42b1 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -120,6 +120,7 @@
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
+#define RDB_LOAD_KEY (1<<3)
#define RDB_SAVE_NONE 0
#define RDB_SAVE_AOF_PREAMBLE (1<<0)
@@ -140,11 +141,11 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename, rdbSaveInfo *rsi);
-ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key);
+ssize_t rdbSaveObject(rio *rdb, robj *o, rkey *key);
size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb, robj *key);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
-int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
+int rdbSaveKeyValuePair(rio *rdb, rkey *key, robj *val, long long expiretime);
robj *rdbLoadStringObject(rio *rdb);
ssize_t rdbSaveStringObject(rio *rdb, robj *obj);
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index 2785167a8..c8b3f41a6 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -1164,7 +1164,8 @@ static int fetchClusterSlotsConfiguration(client c) {
printf("Cluster slots configuration changed, fetching new one...\n");
const char *errmsg = "Failed to update cluster slots configuration";
static dictType dtype = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
diff --git a/src/redis-cli.c b/src/redis-cli.c
index e363a2795..a1f3f4010 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -1987,7 +1987,8 @@ typedef struct clusterManagerLink {
} clusterManagerLink;
static dictType clusterManagerDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
@@ -1996,10 +1997,12 @@ static dictType clusterManagerDictType = {
};
static dictType clusterManagerLinkDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
+ dictSdsKeyCompare, /* lookup key compare */
+ dictSdsKeyCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
dictListDestructor /* val destructor */
};
@@ -6933,7 +6936,8 @@ void type_free(void* priv_data, void* val) {
}
static dictType typeinfoDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
diff --git a/src/sentinel.c b/src/sentinel.c
index 92ea75436..95b4ff8e4 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -402,7 +402,8 @@ void dictInstancesValDestructor (void *privdata, void *obj) {
* also used for: sentinelRedisInstance->sentinels dictionary that maps
* sentinels ip:port to last seen time in Pub/Sub hello message. */
dictType instancesDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* store hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
@@ -415,7 +416,8 @@ dictType instancesDictType = {
* This is useful into sentinelGetObjectiveLeader() function in order to
* count the votes and understand who is the leader. */
dictType leaderVotesDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
@@ -425,10 +427,12 @@ dictType leaderVotesDictType = {
/* Instance renamed commands table. */
dictType renamedCommandsDictType = {
- dictSdsCaseHash, /* hash function */
+ dictSdsCaseHash, /* lookup hash function */
+ dictSdsCaseHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCaseCompare, /* key compare */
+ dictSdsKeyCaseCompare, /* lookup key compare */
+ dictSdsKeyCaseCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
dictSdsDestructor /* val destructor */
};
diff --git a/src/server.c b/src/server.c
index 4b87b6ac2..ee2985954 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1142,16 +1142,33 @@ void dictListDestructor(void *privdata, void *val)
listRelease((list*)val);
}
-int dictSdsKeyCompare(void *privdata, const void *key1,
- const void *key2)
-{
+static int dictGenericKeyCompare(const char *key1, const char *key2, size_t len1, size_t len2) {
+ if (len1 != len2) return 0;
+ return memcmp(key1, key2, len1) == 0;
+}
+
+int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2) {
int l1,l2;
DICT_NOTUSED(privdata);
l1 = sdslen((sds)key1);
l2 = sdslen((sds)key2);
- if (l1 != l2) return 0;
- return memcmp(key1, key2, l1) == 0;
+ return dictGenericKeyCompare(key1,key2,l1,l2);
+}
+
+int dictSdsRkeyKeyCompare(void *privdata, const void *key1, const void *key2){
+ DICT_NOTUSED(privdata);
+ sds sdskey = (sds)key1;
+ rkey *keyobj = (rkey*)key2;
+ return dictGenericKeyCompare(sdskey,keyobj->name,
+ sdslen(sdskey),keyobj->len);
+}
+
+int dictRkeyKeyCompare(void *privdata, const void *key1, const void *key2) {
+ DICT_NOTUSED(privdata);
+ rkey *k1 = (rkey*)key1;
+ rkey *k2 = (rkey*)key2;
+ return dictGenericKeyCompare(k1->name,k2->name,k1->len,k2->len);
}
/* A case insensitive version used for the command lookup table and other
@@ -1179,6 +1196,13 @@ void dictSdsDestructor(void *privdata, void *val)
sdsfree(val);
}
+void dictKeyDestructor(void *privdata, void *val)
+{
+ DICT_NOTUSED(privdata);
+
+ freeKey(val);
+}
+
int dictObjKeyCompare(void *privdata, const void *key1,
const void *key2)
{
@@ -1195,6 +1219,11 @@ uint64_t dictSdsHash(const void *key) {
return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
}
+uint64_t dictKeyHash(const void *keyptr) {
+ const rkey *key = keyptr;
+ return dictGenHashFunction(key->name, key->len);
+}
+
uint64_t dictSdsCaseHash(const void *key) {
return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
}
@@ -1243,10 +1272,12 @@ uint64_t dictEncObjHash(const void *key) {
/* Generic hash table type where keys are Redis Objects, Values
* dummy pointers. */
dictType objectKeyPointerValueDictType = {
- dictEncObjHash, /* hash function */
+ dictEncObjHash, /* lookup hash function */
+ dictEncObjHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictEncObjKeyCompare, /* key compare */
+ dictEncObjKeyCompare, /* lookup key compare */
+ dictEncObjKeyCompare, /* stored key compare */
dictObjectDestructor, /* key destructor */
NULL /* val destructor */
};
@@ -1254,80 +1285,100 @@ dictType objectKeyPointerValueDictType = {
/* Like objectKeyPointerValueDictType(), but values can be destroyed, if
* not NULL, calling zfree(). */
dictType objectKeyHeapPointerValueDictType = {
- dictEncObjHash, /* hash function */
+ dictEncObjHash, /* lookup hash function */
+ dictEncObjHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictEncObjKeyCompare, /* key compare */
+ dictEncObjKeyCompare, /* lookup key compare */
+ dictEncObjKeyCompare, /* stored key compare */
dictObjectDestructor, /* key destructor */
dictVanillaFree /* val destructor */
};
/* Set dictionary type. Keys are SDS strings, values are ot used. */
dictType setDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
+ dictSdsKeyCompare, /* lookup key compare */
+ dictSdsKeyCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
/* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
dictType zsetDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
+ dictSdsKeyCompare, /* lookup key compare */
+ dictSdsKeyCompare, /* stored key compare */
NULL, /* Note: SDS string shared & freed by skiplist */
NULL /* val destructor */
};
-/* Db->dict, keys are sds strings, vals are Redis objects. */
+/* Db->dict, keys are "rkey" key objects, vals are "robj" Redis objects.
+ *
+ * Note that this dictionary is designed to be looked up via SDS strings
+ * even if keys are stored as rkey structures. So there are two differet
+ * hash functions and two different compare functions. */
dictType dbDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictKeyHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
- dictSdsDestructor, /* key destructor */
+ dictSdsRkeyKeyCompare, /* lookup key compare */
+ dictRkeyKeyCompare, /* stored key compare */
+ dictKeyDestructor, /* key destructor */
dictObjectDestructor /* val destructor */
};
/* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
dictType shaScriptObjectDictType = {
- dictSdsCaseHash, /* hash function */
+ dictSdsCaseHash, /* lookup hash function */
+ dictSdsCaseHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCaseCompare, /* key compare */
+ dictSdsKeyCaseCompare, /* lookup key compare */
+ dictSdsKeyCaseCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
dictObjectDestructor /* val destructor */
};
/* Db->expires */
dictType keyptrDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* look hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
+ dictSdsKeyCompare, /* lookup compare */
+ dictSdsKeyCompare, /* stored compare */
NULL, /* key destructor */
NULL /* val destructor */
};
/* Command table. sds string -> command struct pointer. */
dictType commandTableDictType = {
- dictSdsCaseHash, /* hash function */
+ dictSdsCaseHash, /* lookup hash function */
+ dictSdsCaseHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCaseCompare, /* key compare */
+ dictSdsKeyCaseCompare, /* lookup key compare */
+ dictSdsKeyCaseCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
-/* Hash type hash table (note that small hashes are represented with ziplists) */
+/* Hash values type (note that small hashes are represented with ziplists) */
dictType hashDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
+ dictSdsKeyCompare, /* lookup key compare */
+ dictSdsKeyCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
dictSdsDestructor /* val destructor */
};
@@ -1336,10 +1387,12 @@ dictType hashDictType = {
* lists as values. It's used for blocking operations (BLPOP) and to
* map swapped keys to a list of clients waiting for this keys to be loaded. */
dictType keylistDictType = {
- dictObjHash, /* hash function */
+ dictObjHash, /* lookup hash function */
+ dictObjHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictObjKeyCompare, /* key compare */
+ dictObjKeyCompare, /* lookup key compare */
+ dictObjKeyCompare, /* stored key compare */
dictObjectDestructor, /* key destructor */
dictListDestructor /* val destructor */
};
@@ -1347,10 +1400,12 @@ dictType keylistDictType = {
/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
* clusterNode structures. */
dictType clusterNodesDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
+ dictSdsKeyCompare, /* lookup key compare */
+ dictSdsKeyCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
@@ -1359,10 +1414,12 @@ dictType clusterNodesDictType = {
* we can re-add this node. The goal is to avoid readding a removed
* node for some time. */
dictType clusterNodesBlackListDictType = {
- dictSdsCaseHash, /* hash function */
+ dictSdsCaseHash, /* lookup hash function */
+ dictSdsCaseHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCaseCompare, /* key compare */
+ dictSdsKeyCaseCompare, /* lookup key compare */
+ dictSdsKeyCaseCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
@@ -1371,20 +1428,24 @@ dictType clusterNodesBlackListDictType = {
* we can re-add this node. The goal is to avoid readding a removed
* node for some time. */
dictType modulesDictType = {
- dictSdsCaseHash, /* hash function */
+ dictSdsCaseHash, /* lookup hash function */
+ dictSdsCaseHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCaseCompare, /* key compare */
+ dictSdsKeyCaseCompare, /* lookup key compare */
+ dictSdsKeyCaseCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
/* Migrate cache dict type. */
dictType migrateCacheDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
+ dictSdsKeyCompare, /* lookup key compare */
+ dictSdsKeyCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
@@ -1393,10 +1454,12 @@ dictType migrateCacheDictType = {
* Keys are sds SHA1 strings, while values are not used at all in the current
* implementation. */
dictType replScriptCacheDictType = {
- dictSdsCaseHash, /* hash function */
+ dictSdsCaseHash, /* lookup hash function */
+ dictSdsCaseHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictSdsKeyCaseCompare, /* key compare */
+ dictSdsKeyCaseCompare, /* lookup key compare */
+ dictSdsKeyCaseCompare, /* stored key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
@@ -1415,8 +1478,6 @@ int htNeedsResize(dict *dict) {
void tryResizeHashTables(int dbid) {
if (htNeedsResize(server.db[dbid].dict))
dictResize(server.db[dbid].dict);
- if (htNeedsResize(server.db[dbid].expires))
- dictResize(server.db[dbid].expires);
}
/* Our hash table implementation performs rehashing incrementally while
@@ -1432,11 +1493,6 @@ int incrementallyRehash(int dbid) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
- /* Expires */
- if (dictIsRehashing(server.db[dbid].expires)) {
- dictRehashMilliseconds(server.db[dbid].expires,1);
- return 1; /* already used our millisecond for this loop... */
- }
return 0;
}
@@ -1859,7 +1915,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
- vkeys = dictSize(server.db[j].expires);
+ vkeys = raxSize(server.db[j].expires);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
@@ -2668,7 +2724,6 @@ void resetServerStats(void) {
server.stat_numcommands = 0;
server.stat_numconnections = 0;
server.stat_expiredkeys = 0;
- server.stat_expired_stale_perc = 0;
server.stat_expired_time_cap_reached_count = 0;
server.stat_evictedkeys = 0;
server.stat_keyspace_misses = 0;
@@ -2763,7 +2818,7 @@ void initServer(void) {
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
- server.db[j].expires = dictCreate(&keyptrDictType,NULL);
+ server.db[j].expires = raxNew();
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
@@ -4109,7 +4164,6 @@ sds genRedisInfoString(char *section) {
"sync_partial_ok:%lld\r\n"
"sync_partial_err:%lld\r\n"
"expired_keys:%lld\r\n"
- "expired_stale_perc:%.2f\r\n"
"expired_time_cap_reached_count:%lld\r\n"
"evicted_keys:%lld\r\n"
"keyspace_hits:%lld\r\n"
@@ -4135,7 +4189,6 @@ sds genRedisInfoString(char *section) {
server.stat_sync_partial_ok,
server.stat_sync_partial_err,
server.stat_expiredkeys,
- server.stat_expired_stale_perc*100,
server.stat_expired_time_cap_reached_count,
server.stat_evictedkeys,
server.stat_keyspace_hits,
@@ -4331,7 +4384,7 @@ sds genRedisInfoString(char *section) {
long long keys, vkeys;
keys = dictSize(server.db[j].dict);
- vkeys = dictSize(server.db[j].expires);
+ vkeys = raxSize(server.db[j].expires);
if (keys || vkeys) {
info = sdscatprintf(info,
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
diff --git a/src/server.h b/src/server.h
index 0813f8bd1..f5262487f 100644
--- a/src/server.h
+++ b/src/server.h
@@ -63,8 +63,8 @@ typedef long long mstime_t; /* millisecond time type. */
#include "util.h" /* Misc functions useful in many places */
#include "latency.h" /* Latency monitor API */
#include "sparkline.h" /* ASCII graphs API */
-#include "quicklist.h" /* Lists are encoded as linked lists of
- N-elements flat arrays */
+#include "quicklist.h" /* Lists are encoded as linked lists of
+ N-elements flat arrays */
#include "rax.h" /* Radix tree */
/* Following includes allow test functions to be called from Redis main() */
@@ -634,14 +634,13 @@ typedef struct RedisModuleDigest {
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
+#define KEY_FLAGS_BITS (32-LRU_BITS)
#define OBJ_SHARED_REFCOUNT INT_MAX
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
- unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
- * LFU data (least significant 8 bits frequency
- * and most significant 16 bits access time). */
+ unsigned flags:24;
int refcount;
void *ptr;
} robj;
@@ -660,18 +659,34 @@ typedef struct redisObject {
struct evictionPoolEntry; /* Defined in evict.c */
/* This structure is used in order to represent the output buffer of a client,
- * which is actually a linked list of blocks like that, that is: client->reply. */
+ * which is actually a linked list of clientReplyBlock blocks.
+ * Such list is stored in the client->reply field. */
typedef struct clientReplyBlock {
size_t size, used;
char buf[];
} clientReplyBlock;
+/* Representation of a Redis key. This representation is just used in order
+ * to store keys in the main dictionary. Note that keys with an expire are
+ * also stored in a different data structure as well, stored in db->expires,
+ * for the expiration algorithm to work more efficiently. */
+#define KEY_FLAG_EXPIRE (1<<0)
+typedef struct redisKey {
+ uint32_t len;
+ unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
+ * LFU data (least significant 8 bits frequency
+ * and most significant 16 bits access time). */
+ unsigned flags:KEY_FLAGS_BITS;
+ uint64_t expire;
+ char name[];
+} rkey;
+
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
- dict *expires; /* Timeout of keys with a timeout set */
+ rax *expires; /* Sorted tree of keys with an expire set. */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
@@ -1089,7 +1104,6 @@ struct redisServer {
long long stat_numcommands; /* Number of processed commands */
long long stat_numconnections; /* Number of connections received */
long long stat_expiredkeys; /* Number of expired keys */
- double stat_expired_stale_perc; /* Percentage of keys probably expired */
long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/
long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
long long stat_keyspace_hits; /* Number of successful lookups of keys */
@@ -1674,7 +1688,7 @@ char *strEncoding(int encoding);
int compareStringObjects(robj *a, robj *b);
int collateStringObjects(robj *a, robj *b);
int equalStringObjects(robj *a, robj *b);
-unsigned long long estimateObjectIdleTime(robj *o);
+unsigned long long estimateObjectIdleTime(rkey *k);
void trimStringObjectIfNeeded(robj *o);
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
@@ -1941,31 +1955,34 @@ void rewriteConfigRewriteLine(struct rewriteConfigState *state, const char *opti
int rewriteConfig(char *path);
/* db.c -- Keyspace access API */
-int removeExpire(redisDb *db, robj *key);
+int removeExpire(redisDb *db, rkey *key);
void propagateExpire(redisDb *db, robj *key, int lazy);
-int expireIfNeeded(redisDb *db, robj *key);
-long long getExpire(redisDb *db, robj *key);
-void setExpire(client *c, redisDb *db, robj *key, long long when);
-robj *lookupKey(redisDb *db, robj *key, int flags);
-robj *lookupKeyRead(redisDb *db, robj *key);
-robj *lookupKeyWrite(redisDb *db, robj *key);
-robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply);
-robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
-robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
-robj *objectCommandLookup(client *c, robj *key);
-robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply);
-void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
+int expireIfNeeded(redisDb *db, robj *keyname, rkey *key);
+int expireIfNeededByName(redisDb *db, robj *keyname);
+long long getExpire(rkey *key);
+void setExpire(client *c, redisDb *db, rkey *key, long long when);
+robj *lookupKey(redisDb *db, robj *keyname, rkey **keyptr, int flags);
+robj *lookupKeyRead(redisDb *db, robj *keyname, rkey **keyptr);
+robj *lookupKeyWrite(redisDb *db, robj *keyname, rkey **keyptr);
+robj *lookupKeyReadOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply);
+robj *lookupKeyWriteOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply);
+robj *lookupKeyReadWithFlags(redisDb *db, robj *keyname, rkey **keyptr, int flags);
+void objectSetLRUOrLFU(rkey *key, long long lfu_freq, long long lru_idle,
long long lru_clock);
+robj *objectCommandLookup(client *c, robj *keyname, rkey **key);
+robj *objectCommandLookupOrReply(client *c, robj *keyname, rkey **key, robj *reply);
#define LOOKUP_NONE 0
#define LOOKUP_NOTOUCH (1<<0)
-void dbAdd(redisDb *db, robj *key, robj *val);
-void dbOverwrite(redisDb *db, robj *key, robj *val);
-void setKey(redisDb *db, robj *key, robj *val);
+rkey *dbAdd(redisDb *db, robj *key, robj *val);
+rkey *dbOverwrite(redisDb *db, robj *key, robj *val);
+rkey *setKey(redisDb *db, robj *keyname, robj *val);
int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db);
int dbSyncDelete(redisDb *db, robj *key);
int dbDelete(redisDb *db, robj *key);
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
+void freeKey(rkey *key);
+void removeExpireFromTree(redisDb *db, rkey *key);
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
@@ -2043,7 +2060,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
/* expire.c -- Handling of expired keys */
void activeExpireCycle(int type);
void expireSlaveKeys(void);
-void rememberSlaveKeyWithExpire(redisDb *db, robj *key);
+void rememberSlaveKeyWithExpire(redisDb *db, rkey *key);
void flushSlaveKeysWithExpireList(void);
size_t getSlaveKeyWithExpireCount(void);
@@ -2052,7 +2069,7 @@ void evictionPoolAlloc(void);
#define LFU_INIT_VAL 5
unsigned long LFUGetTimeInMinutes(void);
uint8_t LFULogIncr(uint8_t value);
-unsigned long LFUDecrAndReturn(robj *o);
+unsigned long LFUDecrAndReturn(rkey *k);
/* Keys hashing / comparison functions for dict.c hash tables. */
uint64_t dictSdsHash(const void *key);
diff --git a/src/sort.c b/src/sort.c
index db26da158..b54a40355 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -107,9 +107,9 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst, int writeflag)
/* Lookup substituted key */
if (!writeflag)
- o = lookupKeyRead(db,keyobj);
+ o = lookupKeyRead(db,keyobj,NULL);
else
- o = lookupKeyWrite(db,keyobj);
+ o = lookupKeyWrite(db,keyobj,NULL);
if (o == NULL) goto noobj;
if (fieldobj) {
@@ -271,9 +271,9 @@ void sortCommand(client *c) {
/* Lookup the key to sort. It must be of the right types */
if (storekey)
- sortval = lookupKeyRead(c->db,c->argv[1]);
+ sortval = lookupKeyRead(c->db,c->argv[1],NULL);
else
- sortval = lookupKeyWrite(c->db,c->argv[1]);
+ sortval = lookupKeyWrite(c->db,c->argv[1],NULL);
if (sortval && sortval->type != OBJ_SET &&
sortval->type != OBJ_LIST &&
sortval->type != OBJ_ZSET)
diff --git a/src/t_hash.c b/src/t_hash.c
index bc70e4051..79c6053d8 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -449,7 +449,7 @@ sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what) {
}
robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
- robj *o = lookupKeyWrite(c->db,key);
+ robj *o = lookupKeyWrite(c->db,key,NULL);
if (o == NULL) {
o = createHashObject();
dbAdd(c->db,key,o);
@@ -679,8 +679,8 @@ static void addHashFieldToReply(client *c, robj *o, sds field) {
void hgetCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL ||
- checkType(c,o,OBJ_HASH)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
+ == NULL || checkType(c,o,OBJ_HASH)) return;
addHashFieldToReply(c, o, c->argv[2]->ptr);
}
@@ -691,7 +691,7 @@ void hmgetCommand(client *c) {
/* Don't abort when the key cannot be found. Non-existing keys are empty
* hashes, where HMGET should respond with a series of null bulks. */
- o = lookupKeyRead(c->db, c->argv[1]);
+ o = lookupKeyRead(c->db, c->argv[1], NULL);
if (o != NULL && o->type != OBJ_HASH) {
addReply(c, shared.wrongtypeerr);
return;
@@ -707,7 +707,7 @@ void hdelCommand(client *c) {
robj *o;
int j, deleted = 0, keyremoved = 0;
- if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_HASH)) return;
for (j = 2; j < c->argc; j++) {
@@ -734,7 +734,7 @@ void hdelCommand(client *c) {
void hlenCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_HASH)) return;
addReplyLongLong(c,hashTypeLength(o));
@@ -743,7 +743,7 @@ void hlenCommand(client *c) {
void hstrlenCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_HASH)) return;
addReplyLongLong(c,hashTypeGetValueLength(o,c->argv[2]->ptr));
}
@@ -772,8 +772,8 @@ void genericHgetallCommand(client *c, int flags) {
hashTypeIterator *hi;
int length, count = 0;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL
- || checkType(c,o,OBJ_HASH)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
+ == NULL || checkType(c,o,OBJ_HASH)) return;
/* We return a map if the user requested keys and values, like in the
* HGETALL case. Otherwise to use a flat array makes more sense. */
@@ -817,7 +817,7 @@ void hgetallCommand(client *c) {
void hexistsCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_HASH)) return;
addReply(c, hashTypeExists(o,c->argv[2]->ptr) ? shared.cone : shared.czero);
@@ -828,7 +828,7 @@ void hscanCommand(client *c) {
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
- checkType(c,o,OBJ_HASH)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyscan))
+ == NULL || checkType(c,o,OBJ_HASH)) return;
scanGenericCommand(c,o,cursor);
}
diff --git a/src/t_list.c b/src/t_list.c
index 54e4959b9..b8ac88e46 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -196,7 +196,7 @@ void listTypeConvert(robj *subject, int enc) {
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;
- robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
+ robj *lobj = lookupKeyWrite(c->db,c->argv[1],NULL);
if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
@@ -235,8 +235,8 @@ void pushxGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *subject;
- if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
- checkType(c,subject,OBJ_LIST)) return;
+ if ((subject = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero))
+ == NULL || checkType(c,subject,OBJ_LIST)) return;
for (j = 2; j < c->argc; j++) {
listTypePush(subject,c->argv[j],where);
@@ -277,8 +277,8 @@ void linsertCommand(client *c) {
return;
}
- if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
- checkType(c,subject,OBJ_LIST)) return;
+ if ((subject = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero))
+ == NULL || checkType(c,subject,OBJ_LIST)) return;
/* Seek pivot from head to tail */
iter = listTypeInitIterator(subject,0,LIST_TAIL);
@@ -306,13 +306,13 @@ void linsertCommand(client *c) {
}
void llenCommand(client *c) {
- robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
+ robj *o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
addReplyLongLong(c,listTypeLength(o));
}
void lindexCommand(client *c) {
- robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
+ robj *o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp]);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = NULL;
@@ -339,7 +339,7 @@ void lindexCommand(client *c) {
}
void lsetCommand(client *c) {
- robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
+ robj *o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = c->argv[3];
@@ -365,7 +365,7 @@ void lsetCommand(client *c) {
}
void popGenericCommand(client *c, int where) {
- robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]);
+ robj *o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.null[c->resp]);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
robj *value = listTypePop(o,where);
@@ -402,8 +402,8 @@ void lrangeCommand(client *c) {
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL
- || checkType(c,o,OBJ_LIST)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
+ == NULL || checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);
/* convert negative indexes */
@@ -448,7 +448,7 @@ void ltrimCommand(client *c) {
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
- if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.ok)) == NULL ||
checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);
@@ -496,7 +496,7 @@ void lremCommand(client *c) {
if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
return;
- subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
+ subject = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero);
if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
listTypeIterator *li;
@@ -564,7 +564,7 @@ void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
void rpoplpushCommand(client *c) {
robj *sobj, *value;
- if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
+ if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
== NULL || checkType(c,sobj,OBJ_LIST)) return;
if (listTypeLength(sobj) == 0) {
@@ -572,7 +572,7 @@ void rpoplpushCommand(client *c) {
* versions of Redis delete keys of empty lists. */
addReplyNull(c);
} else {
- robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+ robj *dobj = lookupKeyWrite(c->db,c->argv[2],NULL);
robj *touchedkey = c->argv[1];
if (dobj && checkType(c,dobj,OBJ_LIST)) return;
@@ -649,7 +649,7 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
} else {
/* BRPOPLPUSH */
robj *dstobj =
- lookupKeyWrite(receiver->db,dstkey);
+ lookupKeyWrite(receiver->db,dstkey,NULL);
if (!(dstobj &&
checkType(receiver,dstobj,OBJ_LIST)))
{
@@ -692,7 +692,7 @@ void blockingPopGenericCommand(client *c, int where) {
!= C_OK) return;
for (j = 1; j < c->argc-1; j++) {
- o = lookupKeyWrite(c->db,c->argv[j]);
+ o = lookupKeyWrite(c->db,c->argv[j],NULL);
if (o != NULL) {
if (o->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
@@ -753,7 +753,7 @@ void brpoplpushCommand(client *c) {
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
!= C_OK) return;
- robj *key = lookupKeyWrite(c->db, c->argv[1]);
+ robj *key = lookupKeyWrite(c->db, c->argv[1], NULL);
if (key == NULL) {
if (c->flags & CLIENT_MULTI) {
diff --git a/src/t_set.c b/src/t_set.c
index 05d9ee243..b75918ffa 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -265,7 +265,7 @@ void saddCommand(client *c) {
robj *set;
int j, added = 0;
- set = lookupKeyWrite(c->db,c->argv[1]);
+ set = lookupKeyWrite(c->db,c->argv[1],NULL);
if (set == NULL) {
set = setTypeCreate(c->argv[2]->ptr);
dbAdd(c->db,c->argv[1],set);
@@ -291,7 +291,7 @@ void sremCommand(client *c) {
robj *set;
int j, deleted = 0, keyremoved = 0;
- if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((set = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;
for (j = 2; j < c->argc; j++) {
@@ -317,8 +317,8 @@ void sremCommand(client *c) {
void smoveCommand(client *c) {
robj *srcset, *dstset, *ele;
- srcset = lookupKeyWrite(c->db,c->argv[1]);
- dstset = lookupKeyWrite(c->db,c->argv[2]);
+ srcset = lookupKeyWrite(c->db,c->argv[1],NULL);
+ dstset = lookupKeyWrite(c->db,c->argv[2],NULL);
ele = c->argv[3];
/* If the source key does not exist return 0 */
@@ -373,7 +373,7 @@ void smoveCommand(client *c) {
void sismemberCommand(client *c) {
robj *set;
- if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((set = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;
if (setTypeIsMember(set,c->argv[2]->ptr))
@@ -385,7 +385,7 @@ void sismemberCommand(client *c) {
void scardCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_SET)) return;
addReplyLongLong(c,setTypeSize(o));
@@ -415,7 +415,7 @@ void spopWithCountCommand(client *c) {
/* Make sure a key with the name inputted exists, and that it's type is
* indeed a set. Otherwise, return nil */
- if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
+ if ((set = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
/* If count is zero, serve an empty multibulk ASAP to avoid special
@@ -566,7 +566,7 @@ void spopCommand(client *c) {
/* Make sure a key with the name inputted exists, and that it's type is
* indeed a set */
- if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
+ if ((set = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
/* Get a random element from the set */
@@ -632,7 +632,7 @@ void srandmemberWithCountCommand(client *c) {
uniq = 0;
}
- if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
+ if ((set = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
size = setTypeSize(set);
@@ -760,7 +760,7 @@ void srandmemberCommand(client *c) {
return;
}
- if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
+ if ((set = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
encoding = setTypeRandomElement(set,&ele,&llele);
@@ -802,8 +802,8 @@ void sinterGenericCommand(client *c, robj **setkeys,
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
- lookupKeyWrite(c->db,setkeys[j]) :
- lookupKeyRead(c->db,setkeys[j]);
+ lookupKeyWrite(c->db,setkeys[j],NULL) :
+ lookupKeyRead(c->db,setkeys[j],NULL);
if (!setobj) {
zfree(sets);
if (dstkey) {
@@ -939,8 +939,8 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
- lookupKeyWrite(c->db,setkeys[j]) :
- lookupKeyRead(c->db,setkeys[j]);
+ lookupKeyWrite(c->db,setkeys[j],NULL) :
+ lookupKeyRead(c->db,setkeys[j],NULL);
if (!setobj) {
sets[j] = NULL;
continue;
@@ -1110,7 +1110,7 @@ void sscanCommand(client *c) {
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
- if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
- checkType(c,set,OBJ_SET)) return;
+ if ((set = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyscan))
+ == NULL || checkType(c,set,OBJ_SET)) return;
scanGenericCommand(c,set,cursor);
}
diff --git a/src/t_stream.c b/src/t_stream.c
index 9e7d3d126..2fa59fa21 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1057,7 +1057,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
/* Look the stream at 'key' and return the corresponding stream object.
* The function creates a key setting it to an empty stream if needed. */
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
- robj *o = lookupKeyWrite(c->db,key);
+ robj *o = lookupKeyWrite(c->db,key,NULL);
if (o == NULL) {
o = createStreamObject();
dbAdd(c->db,key,o);
@@ -1287,8 +1287,8 @@ void xrangeGenericCommand(client *c, int rev) {
}
/* Return the specified range to the user. */
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL ||
- checkType(c,o,OBJ_STREAM)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyarray))
+ == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
@@ -1313,7 +1313,7 @@ void xrevrangeCommand(client *c) {
/* XLEN */
void xlenCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
addReplyLongLong(c,s->length);
@@ -1411,7 +1411,7 @@ void xreadCommand(client *c) {
* starting from now. */
int id_idx = i - streams_arg - streams_count;
robj *key = c->argv[i-streams_count];
- robj *o = lookupKeyRead(c->db,key);
+ robj *o = lookupKeyRead(c->db,key,NULL);
if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
streamCG *group = NULL;
@@ -1469,7 +1469,7 @@ void xreadCommand(client *c) {
size_t arraylen = 0;
void *arraylen_ptr = NULL;
for (int i = 0; i < streams_count; i++) {
- robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
+ robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i],NULL);
if (o == NULL) continue;
stream *s = o->ptr;
streamID *gt = ids+i; /* ID must be greater than this. */
@@ -1736,7 +1736,7 @@ NULL
/* Everything but the "HELP" option requires a key and group name. */
if (c->argc >= 4) {
- o = lookupKeyWrite(c->db,c->argv[2]);
+ o = lookupKeyWrite(c->db,c->argv[2],NULL);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
@@ -1840,7 +1840,7 @@ NULL
*
* Set the internal "last ID" of a stream. */
void xsetidCommand(client *c) {
- robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
+ robj *o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
@@ -1881,7 +1881,7 @@ void xsetidCommand(client *c) {
*/
void xackCommand(client *c) {
streamCG *group = NULL;
- robj *o = lookupKeyRead(c->db,c->argv[1]);
+ robj *o = lookupKeyRead(c->db,c->argv[1],NULL);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
@@ -1952,7 +1952,7 @@ void xpendingCommand(client *c) {
}
/* Lookup the key and the group inside the stream. */
- robj *o = lookupKeyRead(c->db,c->argv[1]);
+ robj *o = lookupKeyRead(c->db,c->argv[1],NULL);
streamCG *group;
if (o && checkType(c,o,OBJ_STREAM)) return;
@@ -2131,7 +2131,7 @@ void xpendingCommand(client *c) {
* what messages it is now in charge of. */
void xclaimCommand(client *c) {
streamCG *group = NULL;
- robj *o = lookupKeyRead(c->db,c->argv[1]);
+ robj *o = lookupKeyRead(c->db,c->argv[1],NULL);
long long minidle; /* Minimum idle time argument. */
long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
@@ -2323,7 +2323,7 @@ void xclaimCommand(client *c) {
void xdelCommand(client *c) {
robj *o;
- if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
@@ -2368,7 +2368,7 @@ void xtrimCommand(client *c) {
/* If the key does not exist, we are ok returning zero, that is, the
* number of elements removed from the stream. */
- if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
@@ -2460,7 +2460,7 @@ NULL
key = c->argv[2];
/* Lookup the key now, this is common for all the subcommands but HELP. */
- robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr);
+ robj *o = lookupKeyWriteOrReply(c,key,NULL,shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
diff --git a/src/t_string.c b/src/t_string.c
index 5800c5c0c..6c7c666ad 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -64,7 +64,7 @@ static int checkStringLength(client *c, long long size) {
#define OBJ_SET_EX (1<<2) /* Set if time in seconds is given */
#define OBJ_SET_PX (1<<3) /* Set if time in ms in given */
-void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
+void setGenericCommand(client *c, int flags, robj *keyname, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
if (expire) {
@@ -77,18 +77,18 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
- if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
- (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
+ if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,keyname,NULL) != NULL) ||
+ (flags & OBJ_SET_XX && lookupKeyWrite(c->db,keyname,NULL) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
- setKey(c->db,key,val);
+ rkey *key = setKey(c->db,keyname,val);
server.dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
- notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
+ notifyKeyspaceEvent(NOTIFY_STRING,"set",keyname,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
- "expire",key,c->db->id);
+ "expire",keyname,c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}
@@ -157,8 +157,8 @@ void psetexCommand(client *c) {
int getGenericCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
- return C_OK;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp]))
+ == NULL) return C_OK;
if (o->type != OBJ_STRING) {
addReply(c,shared.wrongtypeerr);
@@ -194,7 +194,7 @@ void setrangeCommand(client *c) {
return;
}
- o = lookupKeyWrite(c->db,c->argv[1]);
+ o = lookupKeyWrite(c->db,c->argv[1],NULL);
if (o == NULL) {
/* Return 0 when setting nothing on a non-existing string */
if (sdslen(value) == 0) {
@@ -251,8 +251,8 @@ void getrangeCommand(client *c) {
return;
if (getLongLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)
return;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptybulk)) == NULL ||
- checkType(c,o,OBJ_STRING)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptybulk)) == NULL
+ || checkType(c,o,OBJ_STRING)) return;
if (o->encoding == OBJ_ENCODING_INT) {
str = llbuf;
@@ -287,7 +287,7 @@ void mgetCommand(client *c) {
addReplyArrayLen(c,c->argc-1);
for (j = 1; j < c->argc; j++) {
- robj *o = lookupKeyRead(c->db,c->argv[j]);
+ robj *o = lookupKeyRead(c->db,c->argv[j],NULL);
if (o == NULL) {
addReplyNull(c);
} else {
@@ -312,7 +312,7 @@ void msetGenericCommand(client *c, int nx) {
* set anything if at least one key alerady exists. */
if (nx) {
for (j = 1; j < c->argc; j += 2) {
- if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
+ if (lookupKeyWrite(c->db,c->argv[j],NULL) != NULL) {
addReply(c, shared.czero);
return;
}
@@ -340,7 +340,7 @@ void incrDecrCommand(client *c, long long incr) {
long long value, oldvalue;
robj *o, *new;
- o = lookupKeyWrite(c->db,c->argv[1]);
+ o = lookupKeyWrite(c->db,c->argv[1],NULL);
if (o != NULL && checkType(c,o,OBJ_STRING)) return;
if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;
@@ -400,7 +400,7 @@ void incrbyfloatCommand(client *c) {
long double incr, value;
robj *o, *new, *aux;
- o = lookupKeyWrite(c->db,c->argv[1]);
+ o = lookupKeyWrite(c->db,c->argv[1],NULL);
if (o != NULL && checkType(c,o,OBJ_STRING)) return;
if (getLongDoubleFromObjectOrReply(c,o,&value,NULL) != C_OK ||
getLongDoubleFromObjectOrReply(c,c->argv[2],&incr,NULL) != C_OK)
@@ -434,7 +434,7 @@ void appendCommand(client *c) {
size_t totlen;
robj *o, *append;
- o = lookupKeyWrite(c->db,c->argv[1]);
+ o = lookupKeyWrite(c->db,c->argv[1],NULL);
if (o == NULL) {
/* Create the key */
c->argv[2] = tryObjectEncoding(c->argv[2]);
@@ -465,7 +465,7 @@ void appendCommand(client *c) {
void strlenCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;
addReplyLongLong(c,stringObjectLen(o));
}
diff --git a/src/t_zset.c b/src/t_zset.c
index fb7078abd..f6f2239e4 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -1597,7 +1597,7 @@ void zaddGenericCommand(client *c, int flags) {
}
/* Lookup the key and create the sorted set if does not exist. */
- zobj = lookupKeyWrite(c->db,key);
+ zobj = lookupKeyWrite(c->db,key,NULL);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
if (server.zset_max_ziplist_entries == 0 ||
@@ -1665,7 +1665,7 @@ void zremCommand(client *c) {
robj *zobj;
int deleted = 0, keyremoved = 0, j;
- if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
+ if ((zobj = lookupKeyWriteOrReply(c,key,NULL,shared.czero)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;
for (j = 2; j < c->argc; j++) {
@@ -1718,7 +1718,7 @@ void zremrangeGenericCommand(client *c, int rangetype) {
}
/* Step 2: Lookup & range sanity checks if needed. */
- if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
+ if ((zobj = lookupKeyWriteOrReply(c,key,NULL,shared.czero)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) goto cleanup;
if (rangetype == ZRANGE_RANK) {
@@ -2165,7 +2165,8 @@ uint64_t dictSdsHash(const void *key);
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
dictType setAccumulatorDictType = {
- dictSdsHash, /* hash function */
+ dictSdsHash, /* lookup hash function */
+ dictSdsHash, /* stored hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
@@ -2205,7 +2206,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
/* read keys to be used for input */
src = zcalloc(sizeof(zsetopsrc) * setnum);
for (i = 0, j = 3; i < setnum; i++, j++) {
- robj *obj = lookupKeyWrite(c->db,c->argv[j]);
+ robj *obj = lookupKeyWrite(c->db,c->argv[j],NULL);
if (obj != NULL) {
if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
zfree(src);
@@ -2427,7 +2428,7 @@ void zrangeGenericCommand(client *c, int reverse) {
return;
}
- if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL
+ if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp])) == NULL
|| checkType(c,zobj,OBJ_ZSET)) return;
/* Sanitize indexes. */
@@ -2575,8 +2576,8 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
}
/* Ok, lookup the key and get the range */
- if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL ||
- checkType(c,zobj,OBJ_ZSET)) return;
+ if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp]))
+ == NULL || checkType(c,zobj,OBJ_ZSET)) return;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
@@ -2730,7 +2731,7 @@ void zcountCommand(client *c) {
}
/* Lookup the sorted set */
- if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
+ if ((zobj = lookupKeyReadOrReply(c, key, NULL, shared.czero)) == NULL ||
checkType(c, zobj, OBJ_ZSET)) return;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
@@ -2807,7 +2808,7 @@ void zlexcountCommand(client *c) {
}
/* Lookup the sorted set */
- if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
+ if ((zobj = lookupKeyReadOrReply(c, key, NULL, shared.czero)) == NULL ||
checkType(c, zobj, OBJ_ZSET))
{
zslFreeLexRange(&range);
@@ -2920,8 +2921,8 @@ void genericZrangebylexCommand(client *c, int reverse) {
}
/* Ok, lookup the key and get the range */
- if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL ||
- checkType(c,zobj,OBJ_ZSET))
+ if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp]))
+ == NULL || checkType(c,zobj,OBJ_ZSET))
{
zslFreeLexRange(&range);
return;
@@ -3065,7 +3066,7 @@ void zcardCommand(client *c) {
robj *key = c->argv[1];
robj *zobj;
- if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
+ if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.czero)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;
addReplyLongLong(c,zsetLength(zobj));
@@ -3076,8 +3077,8 @@ void zscoreCommand(client *c) {
robj *zobj;
double score;
- if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL ||
- checkType(c,zobj,OBJ_ZSET)) return;
+ if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp])) == NULL
+ || checkType(c,zobj,OBJ_ZSET)) return;
if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) {
addReplyNull(c);
@@ -3092,8 +3093,8 @@ void zrankGenericCommand(client *c, int reverse) {
robj *zobj;
long rank;
- if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL ||
- checkType(c,zobj,OBJ_ZSET)) return;
+ if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp]))
+ == NULL || checkType(c,zobj,OBJ_ZSET)) return;
serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
rank = zsetRank(zobj,ele->ptr,reverse);
@@ -3117,8 +3118,8 @@ void zscanCommand(client *c) {
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
- checkType(c,o,OBJ_ZSET)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyscan))
+ == NULL || checkType(c,o,OBJ_ZSET)) return;
scanGenericCommand(c,o,cursor);
}
@@ -3153,7 +3154,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
idx = 0;
while (idx < keyc) {
key = keyv[idx++];
- zobj = lookupKeyWrite(c->db,key);
+ zobj = lookupKeyWrite(c->db,key,NULL);
if (!zobj) continue;
if (checkType(c,zobj,OBJ_ZSET)) return;
break;
@@ -3265,7 +3266,7 @@ void blockingGenericZpopCommand(client *c, int where) {
!= C_OK) return;
for (j = 1; j < c->argc-1; j++) {
- o = lookupKeyWrite(c->db,c->argv[j]);
+ o = lookupKeyWrite(c->db,c->argv[j],NULL);
if (o != NULL) {
if (o->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);