summaryrefslogtreecommitdiff
path: root/src/db.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/db.c')
-rw-r--r--src/db.c55
1 files changed, 48 insertions, 7 deletions
diff --git a/src/db.c b/src/db.c
index 71c642d00..0ded55586 100644
--- a/src/db.c
+++ b/src/db.c
@@ -38,6 +38,15 @@
* C-level DB API
*----------------------------------------------------------------------------*/
+/* Update LFU when an object is accessed.
+ * Firstly, decrement the counter if the decrement time is reached.
+ * Then logarithmically increment the counter, and update the access time. */
+void updateLFU(robj *val) {
+ unsigned long counter = LFUDecrAndReturn(val);
+ counter = LFULogIncr(counter);
+ val->lru = (LFUGetTimeInMinutes()<<8) | counter;
+}
+
/* Low level key lookup API, not actually called directly from commands
* implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
@@ -54,9 +63,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
!(flags & LOOKUP_NOTOUCH))
{
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
- unsigned long ldt = val->lru >> 8;
- unsigned long counter = LFULogIncr(val->lru & 255);
- val->lru = (ldt << 8) | counter;
+ updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
@@ -162,9 +169,9 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
int retval = dictAdd(db->dict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
- if (val->type == OBJ_LIST) signalListAsReady(db, key);
+ if (val->type == OBJ_LIST) signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
- }
+}
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
@@ -180,6 +187,9 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
int saved_lru = old->lru;
dictReplace(db->dict, key->ptr, val);
val->lru = saved_lru;
+ /* LFU should be not only copied but also updated
+ * when a key is overwritten. */
+ updateLFU(val);
} else {
dictReplace(db->dict, key->ptr, val);
}
@@ -788,6 +798,7 @@ void typeCommand(client *c) {
case OBJ_SET: type = "set"; break;
case OBJ_ZSET: type = "zset"; break;
case OBJ_HASH: type = "hash"; break;
+ case OBJ_STREAM: type = "stream"; break;
case OBJ_MODULE: {
moduleValue *mv = o->ptr;
type = mv->type->name;
@@ -941,8 +952,8 @@ void scanDatabaseForReadyLists(redisDb *db) {
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
- if (value && value->type == OBJ_LIST)
- signalListAsReady(db, key);
+ if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM))
+ signalKeyAsReady(db, key);
}
dictReleaseIterator(di);
}
@@ -1352,6 +1363,36 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk
return keys;
}
+/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
+ * [RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N
+ * ID_1 ID_2 ... ID_N */
+int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
+ int i, num, *keys;
+ UNUSED(cmd);
+
+ /* We need to seek the last argument that contains "STREAMS", because other
+ * arguments before may contain it (for example the group name). */
+ int streams_pos = -1;
+ for (i = 1; i < argc; i++) {
+ char *arg = argv[i]->ptr;
+ if (!strcasecmp(arg, "streams")) streams_pos = i;
+ }
+ if (streams_pos != -1) num = argc - streams_pos - 1;
+
+ /* Syntax error. */
+ if (streams_pos == -1 || num % 2 != 0) {
+ *numkeys = 0;
+ return NULL;
+ }
+ num /= 2; /* We have half the keys as there are arguments because
+ there are also the IDs, one per key. */
+
+ keys = zmalloc(sizeof(int) * num);
+ for (i = streams_pos+1; i < argc; i++) keys[i-streams_pos-1] = i;
+ *numkeys = num;
+ return keys;
+}
+
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
* a fast way a key that belongs to a specified hash slot. This is useful
* while rehashing the cluster and in other conditions when we need to