diff options
Diffstat (limited to 'src/db.c')
-rw-r--r-- | src/db.c | 55 |
1 files changed, 48 insertions, 7 deletions
@@ -38,6 +38,15 @@ * C-level DB API *----------------------------------------------------------------------------*/ +/* Update LFU when an object is accessed. + * Firstly, decrement the counter if the decrement time is reached. + * Then logarithmically increment the counter, and update the access time. */ +void updateLFU(robj *val) { + unsigned long counter = LFUDecrAndReturn(val); + counter = LFULogIncr(counter); + val->lru = (LFUGetTimeInMinutes()<<8) | counter; +} + /* Low level key lookup API, not actually called directly from commands * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ @@ -54,9 +63,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { !(flags & LOOKUP_NOTOUCH)) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - unsigned long ldt = val->lru >> 8; - unsigned long counter = LFULogIncr(val->lru & 255); - val->lru = (ldt << 8) | counter; + updateLFU(val); } else { val->lru = LRU_CLOCK(); } @@ -162,9 +169,9 @@ void dbAdd(redisDb *db, robj *key, robj *val) { int retval = dictAdd(db->dict, copy, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); - if (val->type == OBJ_LIST) signalListAsReady(db, key); + if (val->type == OBJ_LIST) signalKeyAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); - } +} /* Overwrite an existing key with a new value. Incrementing the reference * count of the new value is up to the caller. @@ -180,6 +187,9 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { int saved_lru = old->lru; dictReplace(db->dict, key->ptr, val); val->lru = saved_lru; + /* LFU should be not only copied but also updated + * when a key is overwritten. */ + updateLFU(val); } else { dictReplace(db->dict, key->ptr, val); } @@ -788,6 +798,7 @@ void typeCommand(client *c) { case OBJ_SET: type = "set"; break; case OBJ_ZSET: type = "zset"; break; case OBJ_HASH: type = "hash"; break; + case OBJ_STREAM: type = "stream"; break; case OBJ_MODULE: { moduleValue *mv = o->ptr; type = mv->type->name; @@ -941,8 +952,8 @@ void scanDatabaseForReadyLists(redisDb *db) { while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); robj *value = lookupKey(db,key,LOOKUP_NOTOUCH); - if (value && value->type == OBJ_LIST) - signalListAsReady(db, key); + if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM)) + signalKeyAsReady(db, key); } dictReleaseIterator(di); } @@ -1352,6 +1363,36 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk return keys; } +/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] + * [RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N + * ID_1 ID_2 ... ID_N */ +int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { + int i, num, *keys; + UNUSED(cmd); + + /* We need to seek the last argument that contains "STREAMS", because other + * arguments before may contain it (for example the group name). */ + int streams_pos = -1; + for (i = 1; i < argc; i++) { + char *arg = argv[i]->ptr; + if (!strcasecmp(arg, "streams")) streams_pos = i; + } + if (streams_pos != -1) num = argc - streams_pos - 1; + + /* Syntax error. */ + if (streams_pos == -1 || num % 2 != 0) { + *numkeys = 0; + return NULL; + } + num /= 2; /* We have half the keys as there are arguments because + there are also the IDs, one per key. */ + + keys = zmalloc(sizeof(int) * num); + for (i = streams_pos+1; i < argc; i++) keys[i-streams_pos-1] = i; + *numkeys = num; + return keys; +} + /* Slot to Key API. This is used by Redis Cluster in order to obtain in * a fast way a key that belongs to a specified hash slot. This is useful * while rehashing the cluster and in other conditions when we need to |