/* Implementation of EXPIRE (keys with fixed time to live). * * ---------------------------------------------------------------------------- * * Copyright (c) 2009-2016, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "server.h" /*----------------------------------------------------------------------------- * Incremental collection of expired keys. * * When keys are accessed they are expired on-access. However we need a * mechanism in order to ensure keys are eventually removed when expired even * if no access is performed on them. *----------------------------------------------------------------------------*/ /* Helper function for the activeExpireCycle() function. * This function will try to expire the key that is stored in the hash table * entry 'de' of the 'expires' hash table of a Redis database. * * If the key is found to be expired, it is removed from the database and * 1 is returned. Otherwise no operation is performed and 0 is returned. * * When a key is expired, server.stat_expiredkeys is incremented. * * The parameter 'now' is the current time in milliseconds as is passed * to the function to avoid too many gettimeofday() syscalls. */ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { long long t = dictGetSignedIntegerVal(de); if (now > t) { sds key = dictGetKey(de); robj *keyobj = createStringObject(key,sdslen(key)); propagateExpire(db,keyobj,server.lazyfree_lazy_expire); if (server.lazyfree_lazy_expire) dbAsyncDelete(db,keyobj); else dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); decrRefCount(keyobj); server.stat_expiredkeys++; return 1; } else { return 0; } } /* Try to expire a few timed out keys. The algorithm used is adaptive and * will use few CPU cycles if there are few expiring keys, otherwise * it will get more aggressive to avoid that too much memory is used by * keys that can be removed from the keyspace. * * No more than CRON_DBS_PER_CALL databases are tested at every * iteration. * * This kind of call is used when Redis detects that timelimit_exit is * true, so there is more work to do, and we do it more incrementally from * the beforeSleep() function of the event loop. * * Expire cycle type: * * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION * microseconds, and is not repeated again before the same amount of time. * * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is * executed, where the time limit is a percentage of the REDIS_HZ period * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ void activeExpireCycle(int type) { /* This function has some global state in order to continue the work * incrementally across calls. */ static unsigned int current_db = 0; /* Last DB tested. */ static int timelimit_exit = 0; /* Time limit hit in previous call? */ static long long last_fast_cycle = 0; /* When last fast cycle ran. */ int j, iteration = 0; int dbs_per_call = CRON_DBS_PER_CALL; long long start = ustime(), timelimit, elapsed; /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ if (clientsArePaused()) return; if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit * for time limt. Also don't repeat a fast cycle for the same period * as the fast cycle total duration itself. */ if (!timelimit_exit) return; if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; last_fast_cycle = start; } /* We usually should test CRON_DBS_PER_CALL per iteration, with * two exceptions: * * 1) Don't test more DBs than we have. * 2) If last time we hit the time limit, we want to scan all DBs * in this iteration, as there is work to do in some DB and we don't want * expired keys to use memory for too much time. */ if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time * per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; if (type == ACTIVE_EXPIRE_CYCLE_FAST) timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ /* Accumulate some global stats as we expire keys, to have some idea * about the number of keys that are already logically expired, but still * existing inside the database. */ long total_sampled = 0; long total_expired = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { int expired; redisDb *db = server.db+(current_db % server.dbnum); /* Increment the DB now so we are sure if we run out of time * in the current DB we'll restart from the next. This allows to * distribute the time evenly across DBs. */ current_db++; /* Continue to expire if at the end of the cycle more than 25% * of the keys were expired. */ do { unsigned long num, slots; long long now, ttl_sum; int ttl_samples; iteration++; /* If there is nothing to expire try next DB ASAP. */ if ((num = dictSize(db->expires)) == 0) { db->avg_ttl = 0; break; } slots = dictSlots(db->expires); now = mstime(); /* When there are less than 1% filled slots getting random * keys is expensive, so stop here waiting for better times... * The dictionary will be resized asap. */ if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; /* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. */ expired = 0; ttl_sum = 0; ttl_samples = 0; if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; while (num--) { dictEntry *de; long long ttl; if ((de = dictGetRandomKey(db->expires)) == NULL) break; ttl = dictGetSignedIntegerVal(de)-now; if (activeExpireCycleTryExpire(db,de,now)) expired++; if (ttl > 0) { /* We want the average TTL of keys yet not expired. */ ttl_sum += ttl; ttl_samples++; } total_sampled++; } total_expired += expired; /* Update the average TTL stats for this database. */ if (ttl_samples) { long long avg_ttl = ttl_sum/ttl_samples; /* Do a simple running average with a few samples. * We just use the current estimate with a weight of 2% * and the previous estimate with a weight of 98%. */ if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); } /* We can't block forever here even if there are many keys to * expire. So after a given amount of milliseconds return to the * caller waiting for the other active expire cycle. */ if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ elapsed = ustime()-start; if (elapsed > timelimit) { timelimit_exit = 1; server.stat_expired_time_cap_reached_count++; break; } } /* We don't repeat the cycle if there are less than 25% of keys * found expired in the current DB. */ } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); } elapsed = ustime()-start; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); /* Update our estimate of keys existing but yet to be expired. * Running average with this sample accounting for 5%. */ double current_perc; if (total_sampled) { current_perc = (double)total_expired/total_sampled; } else current_perc = 0; server.stat_expired_stale_perc = (current_perc*0.05)+ (server.stat_expired_stale_perc*0.95); } /*----------------------------------------------------------------------------- * Expires of keys created in writable slaves * * Normally slaves do not process expires: they wait the masters to synthesize * DEL operations in order to retain consistency. However writable slaves are * an exception: if a key is created in the slave and an expire is assigned * to it, we need a way to expire such a key, since the master does not know * anything about such a key. * * In order to do so, we track keys created in the slave side with an expire * set, and call the expireSlaveKeys() function from time to time in order to * reclaim the keys if they already expired. * * Note that the use case we are trying to cover here, is a popular one where * slaves are put in writable mode in order to compute slow operations in * the slave side that are mostly useful to actually read data in a more * processed way. Think at sets intersections in a tmp key, with an expire so * that it is also used as a cache to avoid intersecting every time. * * This implementation is currently not perfect but a lot better than leaking * the keys as implemented in 3.2. *----------------------------------------------------------------------------*/ /* The dictionary where we remember key names and database ID of keys we may * want to expire from the slave. Since this function is not often used we * don't even care to initialize the database at startup. We'll do it once * the feature is used the first time, that is, when rememberSlaveKeyWithExpire() * is called. * * The dictionary has an SDS string representing the key as the hash table * key, while the value is a 64 bit unsigned integer with the bits corresponding * to the DB where the keys may exist set to 1. Currently the keys created * with a DB id > 63 are not expired, but a trivial fix is to set the bitmap * to the max 64 bit unsigned value when we know there is a key with a DB * ID greater than 63, and check all the configured DBs in such a case. */ dict *slaveKeysWithExpire = NULL; /* Check the set of keys created by the master with an expire set in order to * check if they should be evicted. */ void expireSlaveKeys(void) { if (slaveKeysWithExpire == NULL || dictSize(slaveKeysWithExpire) == 0) return; int cycles = 0, noexpire = 0; mstime_t start = mstime(); while(1) { dictEntry *de = dictGetRandomKey(slaveKeysWithExpire); sds keyname = dictGetKey(de); uint64_t dbids = dictGetUnsignedIntegerVal(de); uint64_t new_dbids = 0; /* Check the key against every database corresponding to the * bits set in the value bitmap. */ int dbid = 0; while(dbids && dbid < server.dbnum) { if ((dbids & 1) != 0) { redisDb *db = server.db+dbid; dictEntry *expire = dictFind(db->expires,keyname); int expired = 0; if (expire && activeExpireCycleTryExpire(server.db+dbid,expire,start)) { expired = 1; } /* If the key was not expired in this DB, we need to set the * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ if (expire && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } } dbid++; dbids >>= 1; } /* Set the new bitmap as value of the key, in the dictionary * of keys with an expire set directly in the writable slave. Otherwise * if the bitmap is zero, we no longer need to keep track of it. */ if (new_dbids) dictSetUnsignedIntegerVal(de,new_dbids); else dictDelete(slaveKeysWithExpire,keyname); /* Stop conditions: found 3 keys we cna't expire in a row or * time limit was reached. */ cycles++; if (noexpire > 3) break; if ((cycles % 64) == 0 && mstime()-start > 1) break; if (dictSize(slaveKeysWithExpire) == 0) break; } } /* Track keys that received an EXPIRE or similar command in the context * of a writable slave. */ void rememberSlaveKeyWithExpire(redisDb *db, robj *key) { if (slaveKeysWithExpire == NULL) { static dictType dt = { dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; slaveKeysWithExpire = dictCreate(&dt,NULL); } if (db->id > 63) return; dictEntry *de = dictAddOrFind(slaveKeysWithExpire,key->ptr); /* If the entry was just created, set it to a copy of the SDS string * representing the key: we don't want to need to take those keys * in sync with the main DB. The keys will be removed by expireSlaveKeys() * as it scans to find keys to remove. */ if (de->key == key->ptr) { de->key = sdsdup(key->ptr); dictSetUnsignedIntegerVal(de,0); } uint64_t dbids = dictGetUnsignedIntegerVal(de); dbids |= (uint64_t)1 << db->id; dictSetUnsignedIntegerVal(de,dbids); } /* Return the number of keys we are tracking. */ size_t getSlaveKeyWithExpireCount(void) { if (slaveKeysWithExpire == NULL) return 0; return dictSize(slaveKeysWithExpire); } /* Remove the keys in the hash table. We need to do that when data is * flushed from the server. We may receive new keys from the master with * the same name/db and it is no longer a good idea to expire them. * * Note: technically we should handle the case of a single DB being flushed * but it is not worth it since anyway race conditions using the same set * of key names in a wriatable slave and in its master will lead to * inconsistencies. This is just a best-effort thing we do. */ void flushSlaveKeysWithExpireList(void) { if (slaveKeysWithExpire) { dictRelease(slaveKeysWithExpire); slaveKeysWithExpire = NULL; } } /*----------------------------------------------------------------------------- * Expires Commands *----------------------------------------------------------------------------*/ /* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT * and PEXPIREAT. Because the commad second argument may be relative or absolute * the "basetime" argument is used to signal what the base time is (either 0 * for *AT variants of the command, or the current time for relative expires). * * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for * the argv[2] parameter. The basetime is always specified in milliseconds. */ void expireGenericCommand(client *c, long long basetime, int unit) { robj *key = c->argv[1], *param = c->argv[2]; long long when; /* unix time in milliseconds when the key will expire. */ if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK) return; if (unit == UNIT_SECONDS) when *= 1000; when += basetime; /* No key, return zero. */ if (lookupKeyWrite(c->db,key) == NULL) { addReply(c,shared.czero); return; } /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past * should never be executed as a DEL when load the AOF or in the context * of a slave instance. * * Instead we take the other branch of the IF statement setting an expire * (possibly in the past) and wait for an explicit DEL from the master. */ if (when <= mstime() && !server.loading && !server.masterhost) { robj *aux; int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : dbSyncDelete(c->db,key); serverAssertWithInfo(c,key,deleted); server.dirty++; /* Replicate/AOF this as an explicit DEL or UNLINK. */ aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; rewriteClientCommandVector(c,2,aux,key); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); addReply(c, shared.cone); return; } else { setExpire(c,c->db,key,when); addReply(c,shared.cone); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); server.dirty++; return; } } /* EXPIRE key seconds */ void expireCommand(client *c) { expireGenericCommand(c,mstime(),UNIT_SECONDS); } /* EXPIREAT key time */ void expireatCommand(client *c) { expireGenericCommand(c,0,UNIT_SECONDS); } /* PEXPIRE key milliseconds */ void pexpireCommand(client *c) { expireGenericCommand(c,mstime(),UNIT_MILLISECONDS); } /* PEXPIREAT key ms_time */ void pexpireatCommand(client *c) { expireGenericCommand(c,0,UNIT_MILLISECONDS); } /* Implements TTL and PTTL */ void ttlGenericCommand(client *c, int output_ms) { long long expire, ttl = -1; /* If the key does not exist at all, return -2 */ if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == NULL) { addReplyLongLong(c,-2); return; } /* The key exists. Return -1 if it has no expire, or the actual * TTL value otherwise. */ expire = getExpire(c->db,c->argv[1]); if (expire != -1) { ttl = expire-mstime(); if (ttl < 0) ttl = 0; } if (ttl == -1) { addReplyLongLong(c,-1); } else { addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000)); } } /* TTL key */ void ttlCommand(client *c) { ttlGenericCommand(c, 0); } /* PTTL key */ void pttlCommand(client *c) { ttlGenericCommand(c, 1); } /* PERSIST key */ void persistCommand(client *c) { if (lookupKeyWrite(c->db,c->argv[1])) { if (removeExpire(c->db,c->argv[1])) { addReply(c,shared.cone); server.dirty++; } else { addReply(c,shared.czero); } } else { addReply(c,shared.czero); } } /* TOUCH key1 [key2 key3 ... keyN] */ void touchCommand(client *c) { int touched = 0; for (int j = 1; j < c->argc; j++) if (lookupKeyRead(c->db,c->argv[j]) != NULL) touched++; addReplyLongLong(c,touched); }