From 7aa9e6d2ae1d500d8ba900b239207143993ecc3e Mon Sep 17 00:00:00 2001 From: oranagra Date: Fri, 30 Dec 2016 03:37:52 +0200 Subject: active memory defragmentation --- deps/jemalloc/src/jemalloc.c | 32 +++ redis.conf | 20 ++ src/Makefile | 2 +- src/config.c | 58 +++++ src/db.c | 2 +- src/debug.c | 15 +- src/defrag.c | 527 +++++++++++++++++++++++++++++++++++++++++++ src/dict.c | 33 +++ src/dict.h | 5 +- src/server.c | 29 ++- src/server.h | 18 ++ src/zmalloc.c | 20 ++ src/zmalloc.h | 2 + 13 files changed, 755 insertions(+), 8 deletions(-) create mode 100644 src/defrag.c diff --git a/deps/jemalloc/src/jemalloc.c b/deps/jemalloc/src/jemalloc.c index 5a2d32406..fe77c2475 100644 --- a/deps/jemalloc/src/jemalloc.c +++ b/deps/jemalloc/src/jemalloc.c @@ -2591,3 +2591,35 @@ jemalloc_postfork_child(void) } /******************************************************************************/ + +/* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation. + * returns 0 if the allocation is in the currently active run, + * or when it is not causing any frag issue (large or huge bin) + * returns the bin utilization and run utilization both in fixed point 16:16. + * If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */ +JEMALLOC_EXPORT int JEMALLOC_NOTHROW +je_get_defrag_hint(void* ptr, int *bin_util, int *run_util) { + int defrag = 0; + arena_chunk_t *chunk = (arena_chunk_t *)CHUNK_ADDR2BASE(ptr); + if (likely(chunk != ptr)) { /* indication that this is not a HUGE alloc */ + size_t pageind = ((uintptr_t)ptr - (uintptr_t)chunk) >> LG_PAGE; + size_t mapbits = arena_mapbits_get(chunk, pageind); + if (likely((mapbits & CHUNK_MAP_LARGE) == 0)) { /* indication that this is not a LARGE alloc */ + arena_t *arena = extent_node_arena_get(&chunk->node); + size_t rpages_ind = pageind - arena_mapbits_small_runind_get(chunk, pageind); + arena_run_t *run = &arena_miscelm_get(chunk, rpages_ind)->run; + arena_bin_t *bin = &arena->bins[run->binind]; + malloc_mutex_lock(&bin->lock); + /* runs that are in the same chunk in as the current chunk, are likely to be the next currun */ + if (chunk != (arena_chunk_t *)CHUNK_ADDR2BASE(bin->runcur)) { + arena_bin_info_t *bin_info = &arena_bin_info[run->binind]; + size_t availregs = bin_info->nregs * bin->stats.curruns; + *bin_util = (bin->stats.curregs<<16) / availregs; + *run_util = ((bin_info->nregs - run->nfree)<<16) / bin_info->nregs; + defrag = 1; + } + malloc_mutex_unlock(&bin->lock); + } + } + return defrag; +} diff --git a/redis.conf b/redis.conf index 14b5ca979..18ba9fb3f 100644 --- a/redis.conf +++ b/redis.conf @@ -1228,3 +1228,23 @@ aof-rewrite-incremental-fsync yes # # lfu-log-factor 10 # lfu-decay-time 1 + +########################### ACTIVE DEFRAGMENTATION ####################### + +# enabled active defragmentation +# activedefrag yes + +# minimum amount of fragmentation waste to start active defrag +# active-defrag-ignore-bytes 100mb + +# minimum percentage of fragmentation to start active defrag +# active-defrag-threshold-lower 10 + +# maximum percentage of fragmentation at which we use maximum effort +# active-defrag-threshold-upper 100 + +# minimal effort for defrag in CPU percentage +# active-defrag-cycle-min 25 + +# maximal effort for defrag in CPU percentage +# active-defrag-cycle-max 75 diff --git a/src/Makefile b/src/Makefile index 2bf3c9347..3f445f40f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -128,7 +128,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o REDIS_BENCHMARK_NAME=redis-benchmark diff --git a/src/config.c b/src/config.c index 54af5bfe0..4e2c74e5d 100644 --- a/src/config.c +++ b/src/config.c @@ -423,6 +423,10 @@ void loadServerConfigFromString(char *config) { if ((server.repl_slave_lazy_flush = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"activedefrag") && argc == 2) { + if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { if ((server.daemonize = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -499,6 +503,36 @@ void loadServerConfigFromString(char *config) { } zfree(server.rdb_filename); server.rdb_filename = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"active-defrag-threshold-lower") && argc == 2) { + server.active_defrag_threshold_lower = atoi(argv[1]); + if (server.active_defrag_threshold_lower < 0) { + err = "active-defrag-threshold-lower must be 0 or greater"; + goto loaderr; + } + } else if (!strcasecmp(argv[0],"active-defrag-threshold-upper") && argc == 2) { + server.active_defrag_threshold_upper = atoi(argv[1]); + if (server.active_defrag_threshold_upper < 0) { + err = "active-defrag-threshold-upper must be 0 or greater"; + goto loaderr; + } + } else if (!strcasecmp(argv[0],"active-defrag-ignore-bytes") && argc == 2) { + server.active_defrag_ignore_bytes = memtoll(argv[1], NULL); + if (server.active_defrag_ignore_bytes <= 0) { + err = "active-defrag-ignore-bytes must above 0"; + goto loaderr; + } + } else if (!strcasecmp(argv[0],"active-defrag-cycle-min") && argc == 2) { + server.active_defrag_cycle_min = atoi(argv[1]); + if (server.active_defrag_cycle_min < 1 || server.active_defrag_cycle_min > 99) { + err = "active-defrag-cycle-min must be between 1 and 99"; + goto loaderr; + } + } else if (!strcasecmp(argv[0],"active-defrag-cycle-max") && argc == 2) { + server.active_defrag_cycle_max = atoi(argv[1]); + if (server.active_defrag_cycle_max < 1 || server.active_defrag_cycle_max > 99) { + err = "active-defrag-cycle-max must be between 1 and 99"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) { server.hash_max_ziplist_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) { @@ -971,6 +1005,8 @@ void configSetCommand(client *c) { "slave-read-only",server.repl_slave_ro) { } config_set_bool_field( "activerehashing",server.activerehashing) { + } config_set_bool_field( + "activedefrag",server.active_defrag_enabled) { } config_set_bool_field( "protected-mode",server.protected_mode) { } config_set_bool_field( @@ -998,6 +1034,16 @@ void configSetCommand(client *c) { "lfu-decay-time",server.lfu_decay_time,0,LLONG_MAX) { } config_set_numerical_field( "timeout",server.maxidletime,0,LONG_MAX) { + } config_set_numerical_field( + "active-defrag-threshold-lower",server.active_defrag_threshold_lower,0,1000) { + } config_set_numerical_field( + "active-defrag-threshold-upper",server.active_defrag_threshold_upper,0,1000) { + } config_set_memory_field( + "active-defrag-ignore-bytes",server.active_defrag_ignore_bytes) { + } config_set_numerical_field( + "active-defrag-cycle-min",server.active_defrag_cycle_min,1,99) { + } config_set_numerical_field( + "active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) { } config_set_numerical_field( "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,LLONG_MAX){ } config_set_numerical_field( @@ -1166,6 +1212,11 @@ void configGetCommand(client *c) { config_get_numerical_field("maxmemory",server.maxmemory); config_get_numerical_field("maxmemory-samples",server.maxmemory_samples); config_get_numerical_field("timeout",server.maxidletime); + config_get_numerical_field("active-defrag-threshold-lower",server.active_defrag_threshold_lower); + config_get_numerical_field("active-defrag-threshold-upper",server.active_defrag_threshold_upper); + config_get_numerical_field("active-defrag-ignore-bytes",server.active_defrag_ignore_bytes); + config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min); + config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max); config_get_numerical_field("auto-aof-rewrite-percentage", server.aof_rewrite_perc); config_get_numerical_field("auto-aof-rewrite-min-size", @@ -1230,6 +1281,7 @@ void configGetCommand(client *c) { config_get_bool_field("rdbcompression", server.rdb_compression); config_get_bool_field("rdbchecksum", server.rdb_checksum); config_get_bool_field("activerehashing", server.activerehashing); + config_get_bool_field("activedefrag", server.active_defrag_enabled); config_get_bool_field("protected-mode", server.protected_mode); config_get_bool_field("repl-disable-tcp-nodelay", server.repl_disable_tcp_nodelay); @@ -1930,6 +1982,11 @@ int rewriteConfig(char *path) { rewriteConfigBytesOption(state,"maxmemory",server.maxmemory,CONFIG_DEFAULT_MAXMEMORY); rewriteConfigEnumOption(state,"maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum,CONFIG_DEFAULT_MAXMEMORY_POLICY); rewriteConfigNumericalOption(state,"maxmemory-samples",server.maxmemory_samples,CONFIG_DEFAULT_MAXMEMORY_SAMPLES); + rewriteConfigNumericalOption(state,"active-defrag-threshold-lower",server.active_defrag_threshold_lower,CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER); + rewriteConfigNumericalOption(state,"active-defrag-threshold-upper",server.active_defrag_threshold_upper,CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER); + rewriteConfigBytesOption(state,"active-defrag-ignore-bytes",server.active_defrag_ignore_bytes,CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES); + rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN); + rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX); rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0); rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME); rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC); @@ -1956,6 +2013,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"zset-max-ziplist-value",server.zset_max_ziplist_value,OBJ_ZSET_MAX_ZIPLIST_VALUE); rewriteConfigNumericalOption(state,"hll-sparse-max-bytes",server.hll_sparse_max_bytes,CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES); rewriteConfigYesNoOption(state,"activerehashing",server.activerehashing,CONFIG_DEFAULT_ACTIVE_REHASHING); + rewriteConfigYesNoOption(state,"activedefrag",server.active_defrag_enabled,CONFIG_DEFAULT_ACTIVE_DEFRAG); rewriteConfigYesNoOption(state,"protected-mode",server.protected_mode,CONFIG_DEFAULT_PROTECTED_MODE); rewriteConfigClientoutputbufferlimitOption(state); rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ); diff --git a/src/db.c b/src/db.c index 90a75fcfe..a21437c76 100644 --- a/src/db.c +++ b/src/db.c @@ -665,7 +665,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { privdata[0] = keys; privdata[1] = o; do { - cursor = dictScan(ht, cursor, scanCallback, privdata); + cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); } while (cursor && maxiterations-- && listLength(keys) < (unsigned long)count); diff --git a/src/debug.c b/src/debug.c index b8ad4e511..5098d2b64 100644 --- a/src/debug.c +++ b/src/debug.c @@ -282,7 +282,7 @@ void debugCommand(client *c) { blen++; addReplyStatus(c, "ziplist -- Show low level info about the ziplist encoding."); blen++; addReplyStatus(c, - "populate [prefix] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix."); + "populate [prefix] [size] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix."); blen++; addReplyStatus(c, "digest -- Outputs an hex signature representing the current DB content."); blen++; addReplyStatus(c, @@ -433,7 +433,7 @@ void debugCommand(client *c) { addReplyStatus(c,"Ziplist structure printed on stdout"); } } else if (!strcasecmp(c->argv[1]->ptr,"populate") && - (c->argc == 3 || c->argc == 4)) { + c->argc >= 3 && c->argc <= 5) { long keys, j; robj *key, *val; char buf[128]; @@ -442,15 +442,24 @@ void debugCommand(client *c) { return; dictExpand(c->db->dict,keys); for (j = 0; j < keys; j++) { + long valsize = 0; snprintf(buf,sizeof(buf),"%s:%lu", (c->argc == 3) ? "key" : (char*)c->argv[3]->ptr, j); key = createStringObject(buf,strlen(buf)); + if (c->argc == 5) + if (getLongFromObjectOrReply(c, c->argv[4], &valsize, NULL) != C_OK) + return; if (lookupKeyWrite(c->db,key) != NULL) { decrRefCount(key); continue; } snprintf(buf,sizeof(buf),"value:%lu",j); - val = createStringObject(buf,strlen(buf)); + if (valsize==0) + val = createStringObject(buf,strlen(buf)); + else { + val = createStringObject(NULL,valsize); + memset(val->ptr, 0, valsize); + } dbAdd(c->db,key,val); signalModifiedKey(c->db,key); decrRefCount(key); diff --git a/src/defrag.c b/src/defrag.c new file mode 100644 index 000000000..663196c31 --- /dev/null +++ b/src/defrag.c @@ -0,0 +1,527 @@ +/* + * Active memory defragmentation + * Try to find key / value allocations that need to be re-allocated in order + * to reduce external fragmentation. + * We do that by scanning the keyspace and for each pointer we have, we can try to + * ask the allocator if moving it to a new address will help reduce fragmentation. + * + * Copyright (c) 2017, Oran Agra + * Copyright (c) 2017, Redis Labs, Inc + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include +#include +#include + +#if defined(USE_JEMALLOC) && defined(MALLOCX_TCACHE_NONE) + +/* this method was added to jemalloc in order to help us understand which + * pointers are worthwhile moving and which aren't */ +int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); + +/* Defrag helper for generic allocations. + * + * returns NULL in case the allocatoin wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +void* activeDefragAlloc(void *ptr) { + int bin_util, run_util; + size_t size; + void *newptr; + if(!je_get_defrag_hint(ptr, &bin_util, &run_util)) { + server.stat_active_defrag_misses++; + return NULL; + } + /* if this run is more utilized than the average utilization in this bin (or it is full), skip it. + * this will eventually move all the allocations from relatively empty runs into relatively full runs. */ + if (run_util > bin_util || run_util == 1<<16) { + server.stat_active_defrag_misses++; + return NULL; + } + /* move this allocation to a new allocation. + * make sure not to use the thread cache. so that we don't get back the same pointers we try to free */ + size = zmalloc_size(ptr); + newptr = zmalloc_no_tcache(size); + memcpy(newptr, ptr, size); + zfree_no_tcache(ptr); + return newptr; +} + +/*Defrag helper for sds strings + * + * returns NULL in case the allocatoin wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +sds activeDefragSds(sds sdsptr) { + void* ptr = sdsAllocPtr(sdsptr); + void* newptr = activeDefragAlloc(ptr); + if (newptr) { + size_t offset = sdsptr - (char*)ptr; + sdsptr = (char*)newptr + offset; + return sdsptr; + } + return NULL; +} + +/* Defrag helper for robj and/or string objects + * + * returns NULL in case the allocatoin wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +robj *activeDefragStringOb(robj* ob) { + robj *ret = NULL; + if (ob->refcount!=1) + return NULL; + + /* try to defrag robj (only if not an EMBSTR type (handled below) */ + if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) { + if ((ret = activeDefragAlloc(ob))) + ob = ret; + } + + /* try to defrag string object */ + if (ob->type == OBJ_STRING) { + if(ob->encoding==OBJ_ENCODING_RAW) { + sds newsds = activeDefragSds((sds)ob->ptr); + if (newsds) { + ob->ptr = newsds; + /* we don't need to change the return value here. + * we can return NULL if 'ret' is still NULL (since the object pointer itself wasn't changed). + * but we set return value to ob as an indication that we defragged a pointer (for stats). + * NOTE: if ret is already set and the robj was moved, then our stats will be a bit off + * since two pointers were moved, but we show only one in the stats */ + ret = ob; + } + } else if (ob->encoding==OBJ_ENCODING_EMBSTR) { + /* the sds is embedded in the object allocation, calculate the offset and update the pointer in the new allocation */ + long ofs = (intptr_t)ob->ptr - (intptr_t)ob; + if ((ret = activeDefragAlloc(ob))) { + ret->ptr = (void*)((intptr_t)ret + ofs); + } + } else if (ob->encoding!=OBJ_ENCODING_INT) { + serverPanic("Unknown string encoding"); + } + } + return ret; +} + +/* Defrag helper for dictEntries to be used during dict iteration (called on each step). + * returns a stat of how many pointers were moved. */ +int dictIterDefragEntry(dictIterator *iter) { + /* This function is a little bit dirty since it messes with the internals of the dict and it's iterator, + * but the benefit is that it is very easy to use, and require no other chagnes in the dict. */ + int defragged = 0; + dictht *ht; + /* handle the next entry (if there is one), and update the pointer in the current entry. */ + if (iter->nextEntry) { + dictEntry *newde = activeDefragAlloc(iter->nextEntry); + if (newde) { + defragged++; + iter->nextEntry = newde; + iter->entry->next = newde; + } + } + /* handle the case of the first entry in the hash bucket. */ + ht = &iter->d->ht[iter->table]; + if (ht->table[iter->index] == iter->entry) { + dictEntry *newde = activeDefragAlloc(iter->entry); + if (newde) { + iter->entry = newde; + ht->table[iter->index] = newde; + defragged++; + } + } + return defragged; +} + +/* Defrag helper for dict main allocations (dict struct, and hash tables). + * receives a pointer to the dict* and implicitly updates it when the dict struct itself was moved. + * returns a stat of how many pointers were moved. */ +int dictDefragTables(dict** dictRef) { + dict *d = *dictRef; + dictEntry **newtable; + int defragged = 0; + /* handle the dict struct */ + dict *newd = activeDefragAlloc(d); + if (newd) + defragged++, *dictRef = d = newd; + /* handle the first hash table */ + newtable = activeDefragAlloc(d->ht[0].table); + if (newtable) + defragged++, d->ht[0].table = newtable; + /* handle the second hash table */ + if (d->ht[1].table) { + newtable = activeDefragAlloc(d->ht[1].table); + if (newtable) + defragged++, d->ht[1].table = newtable; + } + return defragged; +} + +/* Internal function used by zslDefrag */ +void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) { + int i; + for (i = 0; i < zsl->level; i++) { + if (update[i]->level[i].forward == oldnode) + update[i]->level[i].forward = newnode; + } + if (zsl->header==oldnode) + zsl->header = newnode; + if (zsl->tail==oldnode) + zsl->tail = newnode; + if (newnode->level[0].forward) { + serverAssert(newnode->level[0].forward->backward==oldnode); + newnode->level[0].forward->backward = newnode; + } +} + +/* Defrag helper for sorted set. + * Update the robj pointer, defrag the struct and return the new score reference. + * we may not access oldele pointer (not even the pointer stored in the skiplist), as it was already freed. + * newele may be null, in which case we only need to defrag the skiplist, but not update the obj pointer. + * when return value is non-NULL, it is the score reference that must be updated in the dict record. */ +double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { + zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx; + int i; + sds ele = newele? newele: oldele; + + /* find the skiplist node referring to the object that was moved, + * and all pointers that need to be updated if we'll end up moving the skiplist node. */ + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->level[i].forward && + x->level[i].forward->ele != oldele && /* make sure not to access the ->obj pointer if it matches oldele */ + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + sdscmp(x->level[i].forward->ele,ele) < 0))) + x = x->level[i].forward; + update[i] = x; + } + + /* update the robj pointer inside the skip list record. */ + x = x->level[0].forward; + serverAssert(x && score == x->score && x->ele==oldele); + if (newele) + x->ele = newele; + + /* try to defrag the skiplist record itself */ + newx = activeDefragAlloc(x); + if (newx) { + zslUpdateNode(zsl, x, newx, update); + return &newx->score; + } + return NULL; +} + +/* for each key we scan in the main dict, this function will attempt to defrag all the various pointers it has. + * returns a stat of how many pointers were moved. */ +int defargKey(redisDb *db, dictEntry *de) { + sds keysds = dictGetKey(de); + robj *newob, *ob; + unsigned char *newzl; + dict *d; + dictIterator *di; + int defragged = 0; + sds newsds; + + /* try to defrag the key name */ + newsds = activeDefragSds(keysds); + if (newsds) { + de->key = newsds; + if (dictSize(db->expires)) { + /* Dirty code: + * i can't search in db->expires for that key after i already released the pointer it holds + * it won't be able to do the string compare */ + unsigned int hash = dictGetHash(db->dict, newsds); + dictReplaceKeyPtr(db->expires, keysds, newsds, hash); + } + defragged++; + } + + /* try to defrag robj and / or string value */ + ob = dictGetVal(de); + if ((newob = activeDefragStringOb(ob))) { + de->v.val = newob; + ob = newob; + defragged++; + } + + if (ob->type == OBJ_STRING) { + /* already handled in activeDefragStringOb */ + } else if (ob->type == OBJ_LIST) { + if (ob->encoding == OBJ_ENCODING_QUICKLIST) { + quicklist *ql = ob->ptr, *newql; + quicklistNode *node = ql->head, *newnode; + if ((newql = activeDefragAlloc(ql))) + defragged++, ob->ptr = ql = newql; + do { + if ((newnode = activeDefragAlloc(node))) { + if (newnode->prev) + newnode->prev->next = newnode; + else + ql->head = newnode; + if (newnode->next) + newnode->next->prev = newnode; + else + ql->tail = newnode; + node = newnode; + defragged++; + } + if ((newzl = activeDefragAlloc(node->zl))) + defragged++, node->zl = newzl; + } while ((node = node->next)); + } else if (ob->encoding == OBJ_ENCODING_ZIPLIST) { + if ((newzl = activeDefragAlloc(ob->ptr))) + defragged++, ob->ptr = newzl; + } else { + serverPanic("Unknown list encoding"); + } + } else if (ob->type == OBJ_SET) { + if (ob->encoding == OBJ_ENCODING_HT) { + d = ob->ptr; + di = dictGetIterator(d); + while((de = dictNext(di)) != NULL) { + sds sdsele = dictGetKey(de); + if ((newsds = activeDefragSds(sdsele))) + defragged++, de->key = newsds; + defragged += dictIterDefragEntry(di); + } + dictReleaseIterator(di); + dictDefragTables((dict**)&ob->ptr); + } else if (ob->encoding == OBJ_ENCODING_INTSET) { + intset *is = ob->ptr; + intset *newis = activeDefragAlloc(is); + if (newis) + defragged++, ob->ptr = newis; + } else { + serverPanic("Unknown set encoding"); + } + } else if (ob->type == OBJ_ZSET) { + if (ob->encoding == OBJ_ENCODING_ZIPLIST) { + if ((newzl = activeDefragAlloc(ob->ptr))) + defragged++, ob->ptr = newzl; + } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = (zset*)ob->ptr; + zset *newzs = activeDefragAlloc(zs); + zskiplist *newzsl; + if (newzs) + defragged++, ob->ptr = zs = newzs; + newzsl = activeDefragAlloc(zs->zsl); + if (newzsl) + defragged++, zs->zsl = newzsl; + d = zs->dict; + di = dictGetIterator(d); + while((de = dictNext(di)) != NULL) { + double* newscore; + sds sdsele = dictGetKey(de); + if ((newsds = activeDefragSds(sdsele))) + defragged++, de->key = newsds; + newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds); + if (newscore) { + dictSetVal(d, de, newscore); + defragged++; + } + defragged += dictIterDefragEntry(di); + } + dictReleaseIterator(di); + dictDefragTables(&zs->dict); + } else { + serverPanic("Unknown sorted set encoding"); + } + } else if (ob->type == OBJ_HASH) { + if (ob->encoding == OBJ_ENCODING_ZIPLIST) { + if ((newzl = activeDefragAlloc(ob->ptr))) + defragged++, ob->ptr = newzl; + } else if (ob->encoding == OBJ_ENCODING_HT) { + d = ob->ptr; + di = dictGetIterator(d); + while((de = dictNext(di)) != NULL) { + sds sdsele = dictGetKey(de); + if ((newsds = activeDefragSds(sdsele))) + defragged++, de->key = newsds; + sdsele = dictGetVal(de); + if ((newsds = activeDefragSds(sdsele))) + defragged++, de->v.val = newsds; + defragged += dictIterDefragEntry(di); + } + dictReleaseIterator(di); + dictDefragTables((dict**)&ob->ptr); + } else { + serverPanic("Unknown hash encoding"); + } + } else { + serverPanic("Unknown object type"); + } + return defragged; +} + +/* defrag scan callback for the main db dictionary */ +void defragScanCallback(void *privdata, const dictEntry *de) { + /* TODO: defrag the dictEntry (and also the entriy in expire dict). */ + int defragged = defargKey((redisDb*)privdata, (dictEntry*)de); + server.stat_active_defrag_hits += defragged; + if(defragged) + server.stat_active_defrag_key_hits++; + else + server.stat_active_defrag_key_misses++; +} + +/* defrag scan callback for for each hash table bicket, + * used in order to defrag the dictEntry allocations */ +void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { + UNUSED(privdata); + while(*bucketref) { + dictEntry *de = *bucketref, *newde; + if ((newde = activeDefragAlloc(de))) { + *bucketref = newde; + } + bucketref = &(*bucketref)->next; + } +} + +/* Utility function to get the fragmentation ratio from jemalloc. + * it is critical to do that by comparing only heap maps that belown to jemalloc, and skip ones the jemalloc keeps as spare. + * since we use this fragmentation ratio in order to decide if a defrag action should be taken or not, + * a false detection can cause the defragmenter to waste a lot of CPU without the possibility of getting any results. */ +float getAllocatorFragmentation(size_t *out_frag_bytes) { + size_t epoch = 1, allocated = 0, resident = 0, active = 0, sz = sizeof(size_t); + je_mallctl("epoch", &epoch, &sz, &epoch, sz); /* Update the statistics cached by mallctl. */ + je_mallctl("stats.resident", &resident, &sz, NULL, 0); /* unlike RSS, this does not include RSS from shared libraries and other non heap mappings */ + je_mallctl("stats.active", &active, &sz, NULL, 0); /* unlike resident, this doesn't not include the pages jemalloc reserves for re-use (purge will clean that) */ + je_mallctl("stats.allocated", &allocated, &sz, NULL, 0); /* unlike zmalloc_used_memory, this matches the stats.resident by taking into account all allocations done by this process (not only zmalloc) */ + float frag_pct = ((float)active / allocated)*100 - 100; + size_t frag_bytes = active - allocated; + float rss_pct = ((float)resident / allocated)*100 - 100; + size_t rss_bytes = resident - allocated; + if(out_frag_bytes) + *out_frag_bytes = frag_bytes; + serverLog(LL_DEBUG, + "allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu%% rss)", + allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes); + return frag_pct; +} + +#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) +#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y))) + +/* Perform incremental defragmentation work from the serverCron. + * This works in a similar way to activeExpireCycle, in the sense that + * we do incremental work across calls. */ +void activeDefragCycle(void) { + static int current_db = -1; + static unsigned long cursor = 0; + static redisDb *db = NULL; + static long long start_scan, start_stat; + unsigned int iterations = 0; + unsigned long long defragged = server.stat_active_defrag_hits; + long long start, timelimit; + + if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1) + return; /* defragging memory while there's a fork will just do damage. */ + + /* once a second, check if we the fragmentation justfies starting a scan or making it more aggressive */ + run_with_period(1000) { + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + /* if we're not already running, and below the threshold, exit. */ + if (!server.active_defrag_running) { + if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) + return; + } + + /* calculate the adaptive aggressiveness of the defrag */ + int cpu_pct = INTERPOLATE(frag_pct, server.active_defrag_threshold_lower, server.active_defrag_threshold_upper, + server.active_defrag_cycle_min, server.active_defrag_cycle_max); + cpu_pct = LIMIT(cpu_pct, server.active_defrag_cycle_min, server.active_defrag_cycle_max); + /* we allow increasing the aggressiveness during a scan, but don't reduce it */ + if (!server.active_defrag_running || cpu_pct > server.active_defrag_running) { + server.active_defrag_running = cpu_pct; + serverLog(LL_VERBOSE, + "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } + } + if (!server.active_defrag_running) + return; + + /* See activeExpireCycle for how timelimit is handled. */ + start = ustime(); + timelimit = 1000000*server.active_defrag_running/server.hz/100; + if (timelimit <= 0) timelimit = 1; + + do { + if (!cursor) { + /* Move on to next database, and stop if we reached the last one */ + if (++current_db >= server.dbnum) { + long long now = ustime(); + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + serverLog(LL_VERBOSE, + "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", + (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_stat), frag_pct, frag_bytes); + + start_scan = now; + current_db = -1; + cursor = 0; + db = NULL; + server.active_defrag_running = 0; + return; + } + else if (current_db==0) { + /* start a scan from the first database */ + start_scan = ustime(); + start_stat = server.stat_active_defrag_hits; + } + + db = &server.db[current_db]; + cursor = 0; + } + + do { + cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db); + /* once in 16 scan iterations, or 1000 pointer reallocations (if we have a lot of pointers in one hash bucket), + * check if we reached the tiem limit */ + if (cursor && (++iterations > 16 || server.stat_active_defrag_hits - defragged > 1000)) { + if ((ustime() - start) > timelimit) { + return; + } + iterations = 0; + defragged = server.stat_active_defrag_hits; + } + } while(cursor); + } while(1); +} + +#else /* USE_JEMALLOC */ + +void activeDefragCycle(void) { + /* not implemented yet*/ +} + +#endif \ No newline at end of file diff --git a/src/dict.c b/src/dict.c index b9b2390f1..7b093ac57 100644 --- a/src/dict.c +++ b/src/dict.c @@ -885,6 +885,7 @@ static unsigned long rev(unsigned long v) { unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, + dictScanBucketFunction* bucketfn, void *privdata) { dictht *t0, *t1; @@ -898,6 +899,7 @@ unsigned long dictScan(dict *d, m0 = t0->sizemask; /* Emit entries at cursor */ + if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { next = de->next; @@ -919,6 +921,7 @@ unsigned long dictScan(dict *d, m1 = t1->sizemask; /* Emit entries at cursor */ + if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { next = de->next; @@ -930,6 +933,7 @@ unsigned long dictScan(dict *d, * of the index pointed to by the cursor in the smaller table */ do { /* Emit entries at cursor */ + if (bucketfn) bucketfn(privdata, &t1->table[v & m1]); de = t1->table[v & m1]; while (de) { next = de->next; @@ -1040,6 +1044,35 @@ void dictDisableResize(void) { dict_can_resize = 0; } +unsigned int dictGetHash(dict *d, const void *key) { + return dictHashKey(d, key); +} + +/* Replace an old key pointer in the dictionary with a new pointer. + * oldkey is a dead pointer and should not be accessed. + * the hash value should be provided using dictGetHash. + * no string / key comparison is performed. + * return value is the dictEntry if found, or NULL if not found. */ +dictEntry *dictReplaceKeyPtr(dict *d, const void *oldptr, void *newptr, unsigned int hash) { + dictEntry *he; + unsigned int idx, table; + + if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */ + for (table = 0; table <= 1; table++) { + idx = hash & d->ht[table].sizemask; + he = d->ht[table].table[idx]; + while(he) { + if (oldptr==he->key) { + he->key = newptr; + return he; + } + he = he->next; + } + if (!dictIsRehashing(d)) return NULL; + } + return NULL; +} + /* ------------------------------- Debugging ---------------------------------*/ #define DICT_STATS_VECTLEN 50 diff --git a/src/dict.h b/src/dict.h index 04b247a25..fcb68d998 100644 --- a/src/dict.h +++ b/src/dict.h @@ -95,6 +95,7 @@ typedef struct dictIterator { } dictIterator; typedef void (dictScanFunction)(void *privdata, const dictEntry *de); +typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref); /* This is the initial size of every hash table */ #define DICT_HT_INITIAL_SIZE 4 @@ -176,7 +177,9 @@ int dictRehash(dict *d, int n); int dictRehashMilliseconds(dict *d, int ms); void dictSetHashFunctionSeed(unsigned int initval); unsigned int dictGetHashFunctionSeed(void); -unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata); +unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata); +unsigned int dictGetHash(dict *d, const void *key); +dictEntry *dictReplaceKeyPtr(dict *d, const void *oldptr, void *newptr, unsigned int hash); /* Hash table types */ extern dictType dictTypeHeapStringCopyKey; diff --git a/src/server.c b/src/server.c index f6868832d..8bf6510de 100644 --- a/src/server.c +++ b/src/server.c @@ -876,6 +876,10 @@ void databasesCron(void) { expireSlaveKeys(); } + /* Defrag keys gradually. */ + if (server.active_defrag_enabled) + activeDefragCycle(); + /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ @@ -1332,6 +1336,12 @@ void initServerConfig(void) { server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; server.active_expire_enabled = 1; + server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; + server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; + server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER; + server.active_defrag_threshold_upper = CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER; + server.active_defrag_cycle_min = CONFIG_DEFAULT_DEFRAG_CYCLE_MIN; + server.active_defrag_cycle_max = CONFIG_DEFAULT_DEFRAG_CYCLE_MAX; server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN; server.saveparams = NULL; server.loading = 0; @@ -1368,6 +1378,7 @@ void initServerConfig(void) { server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM; server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR; server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING; + server.active_defrag_running = 0; server.notify_keyspace_events = 0; server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS; server.bpop_blocked_clients = 0; @@ -1718,6 +1729,10 @@ void resetServerStats(void) { server.stat_evictedkeys = 0; server.stat_keyspace_misses = 0; server.stat_keyspace_hits = 0; + server.stat_active_defrag_hits = 0; + server.stat_active_defrag_misses = 0; + server.stat_active_defrag_key_hits = 0; + server.stat_active_defrag_key_misses = 0; server.stat_fork_time = 0; server.stat_fork_rate = 0; server.stat_rejected_conn = 0; @@ -2873,6 +2888,7 @@ sds genRedisInfoString(char *section) { "maxmemory_policy:%s\r\n" "mem_fragmentation_ratio:%.2f\r\n" "mem_allocator:%s\r\n" + "active_defrag_running:%d\r\n" "lazyfree_pending_objects:%zu\r\n", zmalloc_used, hmem, @@ -2894,6 +2910,7 @@ sds genRedisInfoString(char *section) { evict_policy, mh->fragmentation, ZMALLOC_LIB, + server.active_defrag_running, lazyfreeGetPendingObjectsCount() ); freeMemoryOverheadData(mh); @@ -3013,7 +3030,11 @@ sds genRedisInfoString(char *section) { "pubsub_patterns:%lu\r\n" "latest_fork_usec:%lld\r\n" "migrate_cached_sockets:%ld\r\n" - "slave_expires_tracked_keys:%zu\r\n", + "slave_expires_tracked_keys:%zu\r\n" + "active_defrag_hits:%lld\r\n" + "active_defrag_misses:%lld\r\n" + "active_defrag_key_hits:%lld\r\n" + "active_defrag_key_misses:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -3033,7 +3054,11 @@ sds genRedisInfoString(char *section) { listLength(server.pubsub_patterns), server.stat_fork_time, dictSize(server.migrate_cached_sockets), - getSlaveKeyWithExpireCount()); + getSlaveKeyWithExpireCount(), + server.stat_active_defrag_hits, + server.stat_active_defrag_misses, + server.stat_active_defrag_key_hits, + server.stat_active_defrag_key_misses); } /* Replication */ diff --git a/src/server.h b/src/server.h index 140897c18..17e12d9d6 100644 --- a/src/server.h +++ b/src/server.h @@ -152,6 +152,12 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE 0 #define CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL 0 #define CONFIG_DEFAULT_ALWAYS_SHOW_LOGO 0 +#define CONFIG_DEFAULT_ACTIVE_DEFRAG 1 +#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER 10 /* don't defrag when fragmentation is below 10% */ +#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER 100 /* maximum defrag force at 100% fragmentation */ +#define CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES (100<<20) /* don't defrag if frag overhead is below 100mb */ +#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 25 /* 25% CPU min (at lower threshold) */ +#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */ #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ @@ -857,6 +863,7 @@ struct redisServer { unsigned lruclock:LRU_BITS; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ + int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ char *requirepass; /* Pass for AUTH command, or NULL */ char *pidfile; /* PID file path */ int arch_bits; /* 32 or 64 depending on sizeof(long) */ @@ -908,6 +915,10 @@ struct redisServer { long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ long long stat_keyspace_hits; /* Number of successful lookups of keys */ long long stat_keyspace_misses; /* Number of failed lookups of keys */ + long long stat_active_defrag_hits; /* number of allocations moved */ + long long stat_active_defrag_misses; /* number of allocations scanned but not moved */ + long long stat_active_defrag_key_hits; /* number of keys with moved allocations */ + long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */ size_t stat_peak_memory; /* Max used memory record */ long long stat_fork_time; /* Time needed to perform latest fork() */ double stat_fork_rate; /* Fork rate in GB/sec. */ @@ -937,6 +948,12 @@ struct redisServer { int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ + int active_defrag_enabled; + size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ + int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */ + int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ + int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ + int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ @@ -1576,6 +1593,7 @@ void adjustOpenFilesLimit(void); void closeListeningSockets(int unlink_unix_socket); void updateCachedTime(void); void resetServerStats(void); +void activeDefragCycle(void); unsigned int getLRUClock(void); const char *evictPolicyToString(void); struct redisMemOverhead *getMemoryOverheadData(void); diff --git a/src/zmalloc.c b/src/zmalloc.c index 22bf84fce..974e4fc12 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -66,6 +66,8 @@ void zlibc_free(void *ptr) { #define calloc(count,size) je_calloc(count,size) #define realloc(ptr,size) je_realloc(ptr,size) #define free(ptr) je_free(ptr) +#define mallocx(size,flags) je_mallocx(size,flags) +#define dallocx(ptr,flags) je_dallocx(ptr,flags) #endif #define update_zmalloc_stat_alloc(__n) do { \ @@ -115,6 +117,24 @@ void *zmalloc(size_t size) { #endif } +/* Allocation and free functions that bypass the thread cache + * and go straight to the allocator arena bins. + * Currently implemented only for jemalloc */ +#if defined(USE_JEMALLOC) && defined(MALLOCX_TCACHE_NONE) +void *zmalloc_no_tcache(size_t size) { + void *ptr = mallocx(size+PREFIX_SIZE, MALLOCX_TCACHE_NONE); + if (!ptr) zmalloc_oom_handler(size); + update_zmalloc_stat_alloc(zmalloc_size(ptr)); + return ptr; +} + +void zfree_no_tcache(void *ptr) { + if (ptr == NULL) return; + update_zmalloc_stat_free(zmalloc_size(ptr)); + dallocx(ptr, MALLOCX_TCACHE_NONE); +} +#endif + void *zcalloc(size_t size) { void *ptr = calloc(1, size+PREFIX_SIZE); diff --git a/src/zmalloc.h b/src/zmalloc.h index 9badf8f4c..2f7015490 100644 --- a/src/zmalloc.h +++ b/src/zmalloc.h @@ -69,6 +69,8 @@ void *zmalloc(size_t size); void *zcalloc(size_t size); void *zrealloc(void *ptr, size_t size); void zfree(void *ptr); +void zfree_no_tcache(void *ptr); +void *zmalloc_no_tcache(size_t size); char *zstrdup(const char *s); size_t zmalloc_used_memory(void); void zmalloc_enable_thread_safeness(void); -- cgit v1.2.1