#include "server.h" #include "bio.h" #include "atomicvar.h" #include "cluster.h" static redisAtomic size_t lazyfree_objects = 0; /* Return the number of currently pending objects to free. */ size_t lazyfreeGetPendingObjectsCount(void) { size_t aux; atomicGet(lazyfree_objects,aux); return aux; } /* Return the amount of work needed in order to free an object. * The return value is not always the actual number of allocations the * object is composed of, but a number proportional to it. * * For strings the function always returns 1. * * For aggregated objects represented by hash tables or other data structures * the function just returns the number of elements the object is composed of. * * Objects composed of single allocations are always reported as having a * single item even if they are actually logical composed of multiple * elements. * * For lists the function returns the number of elements in the quicklist * representing the list. */ size_t lazyfreeGetFreeEffort(robj *obj) { if (obj->type == OBJ_LIST) { quicklist *ql = obj->ptr; return ql->len; } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) { dict *ht = obj->ptr; return dictSize(ht); } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){ zset *zs = obj->ptr; return zs->zsl->length; } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) { dict *ht = obj->ptr; return dictSize(ht); } else if (obj->type == OBJ_STREAM) { size_t effort = 0; stream *s = obj->ptr; /* Make a best effort estimate to maintain constant runtime. Every macro * node in the Stream is one allocation. */ effort += s->rax->numnodes; /* Every consumer group is an allocation and so are the entries in its * PEL. We use size of the first group's PEL as an estimate for all * others. */ if (s->cgroups) { raxIterator ri; streamCG *cg; raxStart(&ri,s->cgroups); raxSeek(&ri,"^",NULL,0); /* There must be at least one group so the following should always * work. */ serverAssert(raxNext(&ri)); cg = ri.data; effort += raxSize(s->cgroups)*(1+raxSize(cg->pel)); raxStop(&ri); } return effort; } else { return 1; /* Everything else is a single allocation. */ } } /* Delete a key, value, and associated expiration entry if any, from the DB. * If there are enough allocations to free the value object may be put into * a lazy free list instead of being freed synchronously. The lazy free list * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 int dbAsyncDelete(redisDb *db, robj *key) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ dictEntry *de = dictUnlink(db->dict,key->ptr); if (de) { robj *val = dictGetVal(de); size_t free_effort = lazyfreeGetFreeEffort(val); /* If releasing the object is too much work, do it in the background * by adding the object to the lazy free list. * Note that if the object is shared, to reclaim it now it is not * possible. This rarely happens, however sometimes the implementation * of parts of the Redis core may call incrRefCount() to protect * objects, and then call dbDelete(). In this case we'll fall * through and reach the dictFreeUnlinkedEntry() call, that will be * equivalent to just calling decrRefCount(). */ if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); dictSetVal(db->dict,de,NULL); } } /* Release the key-val pair, or just the key if we set the val * field to NULL in order to lazy free it later. */ if (de) { dictFreeUnlinkedEntry(db->dict,de); if (server.cluster_enabled) slotToKeyDel(key->ptr); return 1; } else { return 0; } } /* Free an object, if the object is huge enough, free it in async way. */ void freeObjAsync(robj *o) { size_t free_effort = lazyfreeGetFreeEffort(o); if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL); } else { decrRefCount(o); } } /* Empty a Redis DB asynchronously. What the function does actually is to * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void emptyDbAsync(redisDb *db) { dict *oldht1 = db->dict, *oldht2 = db->expires; db->dict = dictCreate(&dbDictType,NULL); db->expires = dictCreate(&keyptrDictType,NULL); atomicIncr(lazyfree_objects,dictSize(oldht1)); bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); } /* Empty the slots-keys map of Redis CLuster by creating a new empty one * and scheduling the old for lazy freeing. */ void slotToKeyFlushAsync(void) { rax *old = server.cluster->slots_to_keys; server.cluster->slots_to_keys = raxNew(); memset(server.cluster->slots_keys_count,0, sizeof(server.cluster->slots_keys_count)); atomicIncr(lazyfree_objects,old->numele); bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,old); } /* Release objects from the lazyfree thread. It's just decrRefCount() * updating the count of objects to release. */ void lazyfreeFreeObjectFromBioThread(robj *o) { decrRefCount(o); atomicDecr(lazyfree_objects,1); } /* Release a database from the lazyfree thread. The 'db' pointer is the * database which was substituted with a fresh one in the main thread * when the database was logically deleted. 'sl' is a skiplist used by * Redis Cluster in order to take the hash slots -> keys mapping. This * may be NULL if Redis Cluster is disabled. */ void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { size_t numkeys = dictSize(ht1); dictRelease(ht1); dictRelease(ht2); atomicDecr(lazyfree_objects,numkeys); } /* Release the skiplist mapping Redis Cluster keys to slots in the * lazyfree thread. */ void lazyfreeFreeSlotsMapFromBioThread(rax *rt) { size_t len = rt->numele; raxFree(rt); atomicDecr(lazyfree_objects,len); }