summaryrefslogtreecommitdiff
path: root/src/defrag.c
diff options
context:
space:
mode:
authororanagra <oran@redislabs.com>2016-12-30 03:37:52 +0200
committeroranagra <oran@redislabs.com>2016-12-30 03:37:52 +0200
commit7aa9e6d2ae1d500d8ba900b239207143993ecc3e (patch)
treeed9d684b562250c09d9570c109f0630554b5af6c /src/defrag.c
parent6712bce92c79de5c2caa38e9b597a3fa52fd497f (diff)
downloadredis-7aa9e6d2ae1d500d8ba900b239207143993ecc3e.tar.gz
active memory defragmentation
Diffstat (limited to 'src/defrag.c')
-rw-r--r--src/defrag.c527
1 files changed, 527 insertions, 0 deletions
diff --git a/src/defrag.c b/src/defrag.c
new file mode 100644
index 000000000..663196c31
--- /dev/null
+++ b/src/defrag.c
@@ -0,0 +1,527 @@
+/*
+ * Active memory defragmentation
+ * Try to find key / value allocations that need to be re-allocated in order
+ * to reduce external fragmentation.
+ * We do that by scanning the keyspace and for each pointer we have, we can try to
+ * ask the allocator if moving it to a new address will help reduce fragmentation.
+ *
+ * Copyright (c) 2017, Oran Agra
+ * Copyright (c) 2017, Redis Labs, Inc
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include <time.h>
+#include <assert.h>
+#include <stddef.h>
+
+#if defined(USE_JEMALLOC) && defined(MALLOCX_TCACHE_NONE)
+
+/* this method was added to jemalloc in order to help us understand which
+ * pointers are worthwhile moving and which aren't */
+int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
+
+/* Defrag helper for generic allocations.
+ *
+ * returns NULL in case the allocatoin wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+void* activeDefragAlloc(void *ptr) {
+ int bin_util, run_util;
+ size_t size;
+ void *newptr;
+ if(!je_get_defrag_hint(ptr, &bin_util, &run_util)) {
+ server.stat_active_defrag_misses++;
+ return NULL;
+ }
+ /* if this run is more utilized than the average utilization in this bin (or it is full), skip it.
+ * this will eventually move all the allocations from relatively empty runs into relatively full runs. */
+ if (run_util > bin_util || run_util == 1<<16) {
+ server.stat_active_defrag_misses++;
+ return NULL;
+ }
+ /* move this allocation to a new allocation.
+ * make sure not to use the thread cache. so that we don't get back the same pointers we try to free */
+ size = zmalloc_size(ptr);
+ newptr = zmalloc_no_tcache(size);
+ memcpy(newptr, ptr, size);
+ zfree_no_tcache(ptr);
+ return newptr;
+}
+
+/*Defrag helper for sds strings
+ *
+ * returns NULL in case the allocatoin wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+sds activeDefragSds(sds sdsptr) {
+ void* ptr = sdsAllocPtr(sdsptr);
+ void* newptr = activeDefragAlloc(ptr);
+ if (newptr) {
+ size_t offset = sdsptr - (char*)ptr;
+ sdsptr = (char*)newptr + offset;
+ return sdsptr;
+ }
+ return NULL;
+}
+
+/* Defrag helper for robj and/or string objects
+ *
+ * returns NULL in case the allocatoin wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+robj *activeDefragStringOb(robj* ob) {
+ robj *ret = NULL;
+ if (ob->refcount!=1)
+ return NULL;
+
+ /* try to defrag robj (only if not an EMBSTR type (handled below) */
+ if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
+ if ((ret = activeDefragAlloc(ob)))
+ ob = ret;
+ }
+
+ /* try to defrag string object */
+ if (ob->type == OBJ_STRING) {
+ if(ob->encoding==OBJ_ENCODING_RAW) {
+ sds newsds = activeDefragSds((sds)ob->ptr);
+ if (newsds) {
+ ob->ptr = newsds;
+ /* we don't need to change the return value here.
+ * we can return NULL if 'ret' is still NULL (since the object pointer itself wasn't changed).
+ * but we set return value to ob as an indication that we defragged a pointer (for stats).
+ * NOTE: if ret is already set and the robj was moved, then our stats will be a bit off
+ * since two pointers were moved, but we show only one in the stats */
+ ret = ob;
+ }
+ } else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
+ /* the sds is embedded in the object allocation, calculate the offset and update the pointer in the new allocation */
+ long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
+ if ((ret = activeDefragAlloc(ob))) {
+ ret->ptr = (void*)((intptr_t)ret + ofs);
+ }
+ } else if (ob->encoding!=OBJ_ENCODING_INT) {
+ serverPanic("Unknown string encoding");
+ }
+ }
+ return ret;
+}
+
+/* Defrag helper for dictEntries to be used during dict iteration (called on each step).
+ * returns a stat of how many pointers were moved. */
+int dictIterDefragEntry(dictIterator *iter) {
+ /* This function is a little bit dirty since it messes with the internals of the dict and it's iterator,
+ * but the benefit is that it is very easy to use, and require no other chagnes in the dict. */
+ int defragged = 0;
+ dictht *ht;
+ /* handle the next entry (if there is one), and update the pointer in the current entry. */
+ if (iter->nextEntry) {
+ dictEntry *newde = activeDefragAlloc(iter->nextEntry);
+ if (newde) {
+ defragged++;
+ iter->nextEntry = newde;
+ iter->entry->next = newde;
+ }
+ }
+ /* handle the case of the first entry in the hash bucket. */
+ ht = &iter->d->ht[iter->table];
+ if (ht->table[iter->index] == iter->entry) {
+ dictEntry *newde = activeDefragAlloc(iter->entry);
+ if (newde) {
+ iter->entry = newde;
+ ht->table[iter->index] = newde;
+ defragged++;
+ }
+ }
+ return defragged;
+}
+
+/* Defrag helper for dict main allocations (dict struct, and hash tables).
+ * receives a pointer to the dict* and implicitly updates it when the dict struct itself was moved.
+ * returns a stat of how many pointers were moved. */
+int dictDefragTables(dict** dictRef) {
+ dict *d = *dictRef;
+ dictEntry **newtable;
+ int defragged = 0;
+ /* handle the dict struct */
+ dict *newd = activeDefragAlloc(d);
+ if (newd)
+ defragged++, *dictRef = d = newd;
+ /* handle the first hash table */
+ newtable = activeDefragAlloc(d->ht[0].table);
+ if (newtable)
+ defragged++, d->ht[0].table = newtable;
+ /* handle the second hash table */
+ if (d->ht[1].table) {
+ newtable = activeDefragAlloc(d->ht[1].table);
+ if (newtable)
+ defragged++, d->ht[1].table = newtable;
+ }
+ return defragged;
+}
+
+/* Internal function used by zslDefrag */
+void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) {
+ int i;
+ for (i = 0; i < zsl->level; i++) {
+ if (update[i]->level[i].forward == oldnode)
+ update[i]->level[i].forward = newnode;
+ }
+ if (zsl->header==oldnode)
+ zsl->header = newnode;
+ if (zsl->tail==oldnode)
+ zsl->tail = newnode;
+ if (newnode->level[0].forward) {
+ serverAssert(newnode->level[0].forward->backward==oldnode);
+ newnode->level[0].forward->backward = newnode;
+ }
+}
+
+/* Defrag helper for sorted set.
+ * Update the robj pointer, defrag the struct and return the new score reference.
+ * we may not access oldele pointer (not even the pointer stored in the skiplist), as it was already freed.
+ * newele may be null, in which case we only need to defrag the skiplist, but not update the obj pointer.
+ * when return value is non-NULL, it is the score reference that must be updated in the dict record. */
+double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
+ zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx;
+ int i;
+ sds ele = newele? newele: oldele;
+
+ /* find the skiplist node referring to the object that was moved,
+ * and all pointers that need to be updated if we'll end up moving the skiplist node. */
+ x = zsl->header;
+ for (i = zsl->level-1; i >= 0; i--) {
+ while (x->level[i].forward &&
+ x->level[i].forward->ele != oldele && /* make sure not to access the ->obj pointer if it matches oldele */
+ (x->level[i].forward->score < score ||
+ (x->level[i].forward->score == score &&
+ sdscmp(x->level[i].forward->ele,ele) < 0)))
+ x = x->level[i].forward;
+ update[i] = x;
+ }
+
+ /* update the robj pointer inside the skip list record. */
+ x = x->level[0].forward;
+ serverAssert(x && score == x->score && x->ele==oldele);
+ if (newele)
+ x->ele = newele;
+
+ /* try to defrag the skiplist record itself */
+ newx = activeDefragAlloc(x);
+ if (newx) {
+ zslUpdateNode(zsl, x, newx, update);
+ return &newx->score;
+ }
+ return NULL;
+}
+
+/* for each key we scan in the main dict, this function will attempt to defrag all the various pointers it has.
+ * returns a stat of how many pointers were moved. */
+int defargKey(redisDb *db, dictEntry *de) {
+ sds keysds = dictGetKey(de);
+ robj *newob, *ob;
+ unsigned char *newzl;
+ dict *d;
+ dictIterator *di;
+ int defragged = 0;
+ sds newsds;
+
+ /* try to defrag the key name */
+ newsds = activeDefragSds(keysds);
+ if (newsds) {
+ de->key = newsds;
+ if (dictSize(db->expires)) {
+ /* Dirty code:
+ * i can't search in db->expires for that key after i already released the pointer it holds
+ * it won't be able to do the string compare */
+ unsigned int hash = dictGetHash(db->dict, newsds);
+ dictReplaceKeyPtr(db->expires, keysds, newsds, hash);
+ }
+ defragged++;
+ }
+
+ /* try to defrag robj and / or string value */
+ ob = dictGetVal(de);
+ if ((newob = activeDefragStringOb(ob))) {
+ de->v.val = newob;
+ ob = newob;
+ defragged++;
+ }
+
+ if (ob->type == OBJ_STRING) {
+ /* already handled in activeDefragStringOb */
+ } else if (ob->type == OBJ_LIST) {
+ if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
+ quicklist *ql = ob->ptr, *newql;
+ quicklistNode *node = ql->head, *newnode;
+ if ((newql = activeDefragAlloc(ql)))
+ defragged++, ob->ptr = ql = newql;
+ do {
+ if ((newnode = activeDefragAlloc(node))) {
+ if (newnode->prev)
+ newnode->prev->next = newnode;
+ else
+ ql->head = newnode;
+ if (newnode->next)
+ newnode->next->prev = newnode;
+ else
+ ql->tail = newnode;
+ node = newnode;
+ defragged++;
+ }
+ if ((newzl = activeDefragAlloc(node->zl)))
+ defragged++, node->zl = newzl;
+ } while ((node = node->next));
+ } else if (ob->encoding == OBJ_ENCODING_ZIPLIST) {
+ if ((newzl = activeDefragAlloc(ob->ptr)))
+ defragged++, ob->ptr = newzl;
+ } else {
+ serverPanic("Unknown list encoding");
+ }
+ } else if (ob->type == OBJ_SET) {
+ if (ob->encoding == OBJ_ENCODING_HT) {
+ d = ob->ptr;
+ di = dictGetIterator(d);
+ while((de = dictNext(di)) != NULL) {
+ sds sdsele = dictGetKey(de);
+ if ((newsds = activeDefragSds(sdsele)))
+ defragged++, de->key = newsds;
+ defragged += dictIterDefragEntry(di);
+ }
+ dictReleaseIterator(di);
+ dictDefragTables((dict**)&ob->ptr);
+ } else if (ob->encoding == OBJ_ENCODING_INTSET) {
+ intset *is = ob->ptr;
+ intset *newis = activeDefragAlloc(is);
+ if (newis)
+ defragged++, ob->ptr = newis;
+ } else {
+ serverPanic("Unknown set encoding");
+ }
+ } else if (ob->type == OBJ_ZSET) {
+ if (ob->encoding == OBJ_ENCODING_ZIPLIST) {
+ if ((newzl = activeDefragAlloc(ob->ptr)))
+ defragged++, ob->ptr = newzl;
+ } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
+ zset *zs = (zset*)ob->ptr;
+ zset *newzs = activeDefragAlloc(zs);
+ zskiplist *newzsl;
+ if (newzs)
+ defragged++, ob->ptr = zs = newzs;
+ newzsl = activeDefragAlloc(zs->zsl);
+ if (newzsl)
+ defragged++, zs->zsl = newzsl;
+ d = zs->dict;
+ di = dictGetIterator(d);
+ while((de = dictNext(di)) != NULL) {
+ double* newscore;
+ sds sdsele = dictGetKey(de);
+ if ((newsds = activeDefragSds(sdsele)))
+ defragged++, de->key = newsds;
+ newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
+ if (newscore) {
+ dictSetVal(d, de, newscore);
+ defragged++;
+ }
+ defragged += dictIterDefragEntry(di);
+ }
+ dictReleaseIterator(di);
+ dictDefragTables(&zs->dict);
+ } else {
+ serverPanic("Unknown sorted set encoding");
+ }
+ } else if (ob->type == OBJ_HASH) {
+ if (ob->encoding == OBJ_ENCODING_ZIPLIST) {
+ if ((newzl = activeDefragAlloc(ob->ptr)))
+ defragged++, ob->ptr = newzl;
+ } else if (ob->encoding == OBJ_ENCODING_HT) {
+ d = ob->ptr;
+ di = dictGetIterator(d);
+ while((de = dictNext(di)) != NULL) {
+ sds sdsele = dictGetKey(de);
+ if ((newsds = activeDefragSds(sdsele)))
+ defragged++, de->key = newsds;
+ sdsele = dictGetVal(de);
+ if ((newsds = activeDefragSds(sdsele)))
+ defragged++, de->v.val = newsds;
+ defragged += dictIterDefragEntry(di);
+ }
+ dictReleaseIterator(di);
+ dictDefragTables((dict**)&ob->ptr);
+ } else {
+ serverPanic("Unknown hash encoding");
+ }
+ } else {
+ serverPanic("Unknown object type");
+ }
+ return defragged;
+}
+
+/* defrag scan callback for the main db dictionary */
+void defragScanCallback(void *privdata, const dictEntry *de) {
+ /* TODO: defrag the dictEntry (and also the entriy in expire dict). */
+ int defragged = defargKey((redisDb*)privdata, (dictEntry*)de);
+ server.stat_active_defrag_hits += defragged;
+ if(defragged)
+ server.stat_active_defrag_key_hits++;
+ else
+ server.stat_active_defrag_key_misses++;
+}
+
+/* defrag scan callback for for each hash table bicket,
+ * used in order to defrag the dictEntry allocations */
+void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
+ UNUSED(privdata);
+ while(*bucketref) {
+ dictEntry *de = *bucketref, *newde;
+ if ((newde = activeDefragAlloc(de))) {
+ *bucketref = newde;
+ }
+ bucketref = &(*bucketref)->next;
+ }
+}
+
+/* Utility function to get the fragmentation ratio from jemalloc.
+ * it is critical to do that by comparing only heap maps that belown to jemalloc, and skip ones the jemalloc keeps as spare.
+ * since we use this fragmentation ratio in order to decide if a defrag action should be taken or not,
+ * a false detection can cause the defragmenter to waste a lot of CPU without the possibility of getting any results. */
+float getAllocatorFragmentation(size_t *out_frag_bytes) {
+ size_t epoch = 1, allocated = 0, resident = 0, active = 0, sz = sizeof(size_t);
+ je_mallctl("epoch", &epoch, &sz, &epoch, sz); /* Update the statistics cached by mallctl. */
+ je_mallctl("stats.resident", &resident, &sz, NULL, 0); /* unlike RSS, this does not include RSS from shared libraries and other non heap mappings */
+ je_mallctl("stats.active", &active, &sz, NULL, 0); /* unlike resident, this doesn't not include the pages jemalloc reserves for re-use (purge will clean that) */
+ je_mallctl("stats.allocated", &allocated, &sz, NULL, 0); /* unlike zmalloc_used_memory, this matches the stats.resident by taking into account all allocations done by this process (not only zmalloc) */
+ float frag_pct = ((float)active / allocated)*100 - 100;
+ size_t frag_bytes = active - allocated;
+ float rss_pct = ((float)resident / allocated)*100 - 100;
+ size_t rss_bytes = resident - allocated;
+ if(out_frag_bytes)
+ *out_frag_bytes = frag_bytes;
+ serverLog(LL_DEBUG,
+ "allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu%% rss)",
+ allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes);
+ return frag_pct;
+}
+
+#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) )
+#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y)))
+
+/* Perform incremental defragmentation work from the serverCron.
+ * This works in a similar way to activeExpireCycle, in the sense that
+ * we do incremental work across calls. */
+void activeDefragCycle(void) {
+ static int current_db = -1;
+ static unsigned long cursor = 0;
+ static redisDb *db = NULL;
+ static long long start_scan, start_stat;
+ unsigned int iterations = 0;
+ unsigned long long defragged = server.stat_active_defrag_hits;
+ long long start, timelimit;
+
+ if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1)
+ return; /* defragging memory while there's a fork will just do damage. */
+
+ /* once a second, check if we the fragmentation justfies starting a scan or making it more aggressive */
+ run_with_period(1000) {
+ size_t frag_bytes;
+ float frag_pct = getAllocatorFragmentation(&frag_bytes);
+ /* if we're not already running, and below the threshold, exit. */
+ if (!server.active_defrag_running) {
+ if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
+ return;
+ }
+
+ /* calculate the adaptive aggressiveness of the defrag */
+ int cpu_pct = INTERPOLATE(frag_pct, server.active_defrag_threshold_lower, server.active_defrag_threshold_upper,
+ server.active_defrag_cycle_min, server.active_defrag_cycle_max);
+ cpu_pct = LIMIT(cpu_pct, server.active_defrag_cycle_min, server.active_defrag_cycle_max);
+ /* we allow increasing the aggressiveness during a scan, but don't reduce it */
+ if (!server.active_defrag_running || cpu_pct > server.active_defrag_running) {
+ server.active_defrag_running = cpu_pct;
+ serverLog(LL_VERBOSE,
+ "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
+ frag_pct, frag_bytes, cpu_pct);
+ }
+ }
+ if (!server.active_defrag_running)
+ return;
+
+ /* See activeExpireCycle for how timelimit is handled. */
+ start = ustime();
+ timelimit = 1000000*server.active_defrag_running/server.hz/100;
+ if (timelimit <= 0) timelimit = 1;
+
+ do {
+ if (!cursor) {
+ /* Move on to next database, and stop if we reached the last one */
+ if (++current_db >= server.dbnum) {
+ long long now = ustime();
+ size_t frag_bytes;
+ float frag_pct = getAllocatorFragmentation(&frag_bytes);
+ serverLog(LL_VERBOSE,
+ "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu",
+ (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_stat), frag_pct, frag_bytes);
+
+ start_scan = now;
+ current_db = -1;
+ cursor = 0;
+ db = NULL;
+ server.active_defrag_running = 0;
+ return;
+ }
+ else if (current_db==0) {
+ /* start a scan from the first database */
+ start_scan = ustime();
+ start_stat = server.stat_active_defrag_hits;
+ }
+
+ db = &server.db[current_db];
+ cursor = 0;
+ }
+
+ do {
+ cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db);
+ /* once in 16 scan iterations, or 1000 pointer reallocations (if we have a lot of pointers in one hash bucket),
+ * check if we reached the tiem limit */
+ if (cursor && (++iterations > 16 || server.stat_active_defrag_hits - defragged > 1000)) {
+ if ((ustime() - start) > timelimit) {
+ return;
+ }
+ iterations = 0;
+ defragged = server.stat_active_defrag_hits;
+ }
+ } while(cursor);
+ } while(1);
+}
+
+#else /* USE_JEMALLOC */
+
+void activeDefragCycle(void) {
+ /* not implemented yet*/
+}
+
+#endif \ No newline at end of file