diff options
-rw-r--r-- | deps/jemalloc/src/background_thread.c | 8 | ||||
-rw-r--r-- | src/cluster.c | 32 | ||||
-rw-r--r-- | src/config.c | 1 | ||||
-rw-r--r-- | src/db.c | 14 | ||||
-rw-r--r-- | src/debug.c | 62 | ||||
-rw-r--r-- | src/defrag.c | 2 | ||||
-rw-r--r-- | src/evict.c | 7 | ||||
-rw-r--r-- | src/geo.c | 8 | ||||
-rw-r--r-- | src/lolwut.c | 4 | ||||
-rw-r--r-- | src/lolwut.h | 2 | ||||
-rw-r--r-- | src/lolwut5.c | 6 | ||||
-rw-r--r-- | src/lolwut6.c | 133 | ||||
-rw-r--r-- | src/object.c | 20 | ||||
-rw-r--r-- | src/rax.c | 3 | ||||
-rw-r--r-- | src/replication.c | 5 | ||||
-rw-r--r-- | src/server.c | 24 | ||||
-rw-r--r-- | src/server.h | 1 | ||||
-rw-r--r-- | src/stream.h | 2 | ||||
-rw-r--r-- | src/t_stream.c | 6 | ||||
-rw-r--r-- | src/zmalloc.c | 32 | ||||
-rw-r--r-- | src/zmalloc.h | 2 | ||||
-rw-r--r-- | tests/integration/replication.tcl | 2 | ||||
-rw-r--r-- | tests/unit/geo.tcl | 2 |
23 files changed, 314 insertions, 64 deletions
diff --git a/deps/jemalloc/src/background_thread.c b/deps/jemalloc/src/background_thread.c index 3517a3bb8..457669c9e 100644 --- a/deps/jemalloc/src/background_thread.c +++ b/deps/jemalloc/src/background_thread.c @@ -787,7 +787,13 @@ background_thread_stats_read(tsdn_t *tsdn, background_thread_stats_t *stats) { nstime_init(&stats->run_interval, 0); for (unsigned i = 0; i < max_background_threads; i++) { background_thread_info_t *info = &background_thread_info[i]; - malloc_mutex_lock(tsdn, &info->mtx); + if (malloc_mutex_trylock(tsdn, &info->mtx)) { + /* + * Each background thread run may take a long time; + * avoid waiting on the stats if the thread is active. + */ + continue; + } if (info->state != background_thread_stopped) { num_runs += info->tot_n_runs; nstime_add(&stats->run_interval, &info->tot_sleep_time); diff --git a/src/cluster.c b/src/cluster.c index 1e7dcd50e..93be2aa32 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2140,7 +2140,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * full length of the packet. When a whole packet is in memory this function * will call the function to process the packet. And so forth. */ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - char buf[sizeof(clusterMsg)]; + clusterMsg buf[1]; ssize_t nread; clusterMsg *hdr; clusterLink *link = (clusterLink*) privdata; @@ -2517,7 +2517,8 @@ void clusterBroadcastPong(int target) { * * If link is NULL, then the message is broadcasted to the whole cluster. */ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { - unsigned char buf[sizeof(clusterMsg)], *payload; + unsigned char *payload; + clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; uint32_t channel_len, message_len; @@ -2537,7 +2538,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { /* Try to use the local buffer if possible */ if (totlen < sizeof(buf)) { - payload = buf; + payload = (unsigned char*)buf; } else { payload = zmalloc(totlen); memcpy(payload,hdr,sizeof(*hdr)); @@ -2554,7 +2555,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { decrRefCount(channel); decrRefCount(message); - if (payload != buf) zfree(payload); + if (payload != (unsigned char*)buf) zfree(payload); } /* Send a FAIL message to all the nodes we are able to contact. @@ -2563,7 +2564,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { * we switch the node state to CLUSTER_NODE_FAIL and ask all the other * nodes to do the same ASAP. */ void clusterSendFail(char *nodename) { - unsigned char buf[sizeof(clusterMsg)]; + clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL); @@ -2575,7 +2576,7 @@ void clusterSendFail(char *nodename) { * slots configuration. The node name, slots bitmap, and configEpoch info * are included. */ void clusterSendUpdate(clusterLink *link, clusterNode *node) { - unsigned char buf[sizeof(clusterMsg)]; + clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; if (link == NULL) return; @@ -2583,7 +2584,7 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) { memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN); hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch); memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots)); - clusterSendMessage(link,buf,ntohl(hdr->totlen)); + clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen)); } /* Send a MODULE message. @@ -2591,7 +2592,8 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) { * If link is NULL, then the message is broadcasted to the whole cluster. */ void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) { - unsigned char buf[sizeof(clusterMsg)], *heapbuf; + unsigned char *heapbuf; + clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; @@ -2606,7 +2608,7 @@ void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, /* Try to use the local buffer if possible */ if (totlen < sizeof(buf)) { - heapbuf = buf; + heapbuf = (unsigned char*)buf; } else { heapbuf = zmalloc(totlen); memcpy(heapbuf,hdr,sizeof(*hdr)); @@ -2619,7 +2621,7 @@ void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, else clusterBroadcastMessage(heapbuf,totlen); - if (heapbuf != buf) zfree(heapbuf); + if (heapbuf != (unsigned char*)buf) zfree(heapbuf); } /* This function gets a cluster node ID string as target, the same way the nodes @@ -2663,7 +2665,7 @@ void clusterPropagatePublish(robj *channel, robj *message) { * Note that we send the failover request to everybody, master and slave nodes, * but only the masters are supposed to reply to our query. */ void clusterRequestFailoverAuth(void) { - unsigned char buf[sizeof(clusterMsg)]; + clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; @@ -2679,7 +2681,7 @@ void clusterRequestFailoverAuth(void) { /* Send a FAILOVER_AUTH_ACK message to the specified node. */ void clusterSendFailoverAuth(clusterNode *node) { - unsigned char buf[sizeof(clusterMsg)]; + clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; @@ -2687,12 +2689,12 @@ void clusterSendFailoverAuth(clusterNode *node) { clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); - clusterSendMessage(node->link,buf,totlen); + clusterSendMessage(node->link,(unsigned char*)buf,totlen); } /* Send a MFSTART message to the specified node. */ void clusterSendMFStart(clusterNode *node) { - unsigned char buf[sizeof(clusterMsg)]; + clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; @@ -2700,7 +2702,7 @@ void clusterSendMFStart(clusterNode *node) { clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); - clusterSendMessage(node->link,buf,totlen); + clusterSendMessage(node->link,(unsigned char*)buf,totlen); } /* Vote for the node asking for our vote if there are the conditions. */ diff --git a/src/config.c b/src/config.c index 0b3bb1cd6..72fb038ea 100644 --- a/src/config.c +++ b/src/config.c @@ -144,6 +144,7 @@ configYesNo configs_yesno[] = { {"replica-serve-stale-data","slave-serve-stale-data",&server.repl_serve_stale_data,1,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA}, {"replica-read-only","slave-read-only",&server.repl_slave_ro,1,CONFIG_DEFAULT_SLAVE_READ_ONLY}, {"replica-ignore-maxmemory","slave-ignore-maxmemory",&server.repl_slave_ignore_maxmemory,1,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY}, + {"jemalloc-bg-thread",NULL,&server.jemalloc_bg_thread,1,1}, {NULL, NULL, 0, 0} }; @@ -457,6 +457,13 @@ void flushdbCommand(client *c) { if (getFlushCommandFlags(c,&flags) == C_ERR) return; server.dirty += emptyDb(c->db->id,flags,NULL); addReply(c,shared.ok); +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchroneus. */ + if (!(flags & EMPTYDB_ASYNC)) + jemalloc_purge(); +#endif } /* FLUSHALL [ASYNC] @@ -479,6 +486,13 @@ void flushallCommand(client *c) { server.dirty = saved_dirty; } server.dirty++; +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchroneus. */ + if (!(flags & EMPTYDB_ASYNC)) + jemalloc_purge(); +#endif } /* This command implements DEL and LAZYDEL. */ diff --git a/src/debug.c b/src/debug.c index 15db2157f..29a244e24 100644 --- a/src/debug.c +++ b/src/debug.c @@ -297,6 +297,56 @@ void computeDatasetDigest(unsigned char *final) { } } +#ifdef USE_JEMALLOC +void mallctl_int(client *c, robj **argv, int argc) { + int ret; + /* start with the biggest size (int64), and if that fails, try smaller sizes (int32, bool) */ + int64_t old = 0, val; + if (argc > 1) { + long long ll; + if (getLongLongFromObjectOrReply(c, argv[1], &ll, NULL) != C_OK) + return; + val = ll; + } + size_t sz = sizeof(old); + while (sz > 0) { + if ((ret=je_mallctl(argv[0]->ptr, &old, &sz, argc > 1? &val: NULL, argc > 1?sz: 0))) { + if (ret==EINVAL) { + /* size might be wrong, try a smaller one */ + sz /= 2; +#if BYTE_ORDER == BIG_ENDIAN + val <<= 8*sz; +#endif + continue; + } + addReplyErrorFormat(c,"%s", strerror(ret)); + return; + } else { +#if BYTE_ORDER == BIG_ENDIAN + old >>= 64 - 8*sz; +#endif + addReplyLongLong(c, old); + return; + } + } + addReplyErrorFormat(c,"%s", strerror(EINVAL)); +} + +void mallctl_string(client *c, robj **argv, int argc) { + int ret; + char *old; + size_t sz = sizeof(old); + /* for strings, it seems we need to first get the old value, before overriding it. */ + if ((ret=je_mallctl(argv[0]->ptr, &old, &sz, NULL, 0))) { + addReplyErrorFormat(c,"%s", strerror(ret)); + return; + } + addReplyBulkCString(c, old); + if(argc > 1) + je_mallctl(argv[0]->ptr, NULL, 0, &argv[1]->ptr, sizeof(char*)); +} +#endif + void debugCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { @@ -323,6 +373,10 @@ void debugCommand(client *c) { "STRUCTSIZE -- Return the size of different Redis core C structures.", "ZIPLIST <key> -- Show low level info about the ziplist encoding.", "STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.", +#ifdef USE_JEMALLOC +"MALLCTL <key> [<val>] -- Get or set a malloc tunning integer.", +"MALLCTL-STR <key> [<val>] -- Get or set a malloc tunning string.", +#endif NULL }; addReplyHelp(c, help); @@ -677,6 +731,14 @@ NULL { stringmatchlen_fuzz_test(); addReplyStatus(c,"Apparently Redis did not crash: test passed"); +#ifdef USE_JEMALLOC + } else if(!strcasecmp(c->argv[1]->ptr,"mallctl") && c->argc >= 3) { + mallctl_int(c, c->argv+2, c->argc-2); + return; + } else if(!strcasecmp(c->argv[1]->ptr,"mallctl-str") && c->argc >= 3) { + mallctl_string(c, c->argv+2, c->argc-2); + return; +#endif } else { addReplySubcommandSyntaxError(c); return; diff --git a/src/defrag.c b/src/defrag.c index 50f6b41f2..e794c8e41 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -374,7 +374,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { if ((newele = activeDefragStringOb(ele, &defragged))) de->v.val = newele, defragged++; } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { - void *newptr, *ptr = ln->value; + void *newptr, *ptr = dictGetVal(de); if ((newptr = activeDefragAlloc(ptr))) ln->value = newptr, defragged++; } diff --git a/src/evict.c b/src/evict.c index 176f4c362..71260c040 100644 --- a/src/evict.c +++ b/src/evict.c @@ -444,6 +444,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ int freeMemoryIfNeeded(void) { + int keys_freed = 0; /* By default replicas should ignore maxmemory * and just be masters exact copies. */ if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; @@ -467,7 +468,7 @@ int freeMemoryIfNeeded(void) { latencyStartMonitor(latency); while (mem_freed < mem_tofree) { - int j, k, i, keys_freed = 0; + int j, k, i; static unsigned int next_db = 0; sds bestkey = NULL; int bestdbid; @@ -598,9 +599,7 @@ int freeMemoryIfNeeded(void) { mem_freed = mem_tofree; } } - } - - if (!keys_freed) { + } else { latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); goto cant_free; /* nothing to free... */ @@ -734,14 +734,14 @@ void geohashCommand(client *c) { r[1].max = 90; geohashEncode(&r[0],&r[1],xy[0],xy[1],26,&hash); - char buf[12]; + char buf[11]; int i; - for (i = 0; i < 11; i++) { + for (i = 0; i < 10; i++) { int idx = (hash.bits >> (52-((i+1)*5))) & 0x1f; buf[i] = geoalphabet[idx]; } - buf[11] = '\0'; - addReplyBulkCBuffer(c,buf,11); + buf[10] = '\0'; + addReplyBulkCBuffer(c,buf,10); } } } diff --git a/src/lolwut.c b/src/lolwut.c index 7e2ceca78..0e1552ba0 100644 --- a/src/lolwut.c +++ b/src/lolwut.c @@ -90,12 +90,12 @@ void lolwutCommand(client *c) { * canvas implementation that can be reused. */ /* Allocate and return a new canvas of the specified size. */ -lwCanvas *lwCreateCanvas(int width, int height) { +lwCanvas *lwCreateCanvas(int width, int height, int bgcolor) { lwCanvas *canvas = zmalloc(sizeof(*canvas)); canvas->width = width; canvas->height = height; canvas->pixels = zmalloc(width*height); - memset(canvas->pixels,0,width*height); + memset(canvas->pixels,bgcolor,width*height); return canvas; } diff --git a/src/lolwut.h b/src/lolwut.h index c049ac907..38c0de423 100644 --- a/src/lolwut.h +++ b/src/lolwut.h @@ -41,7 +41,7 @@ typedef struct lwCanvas { } lwCanvas; /* Drawing functions implemented inside lolwut.c. */ -lwCanvas *lwCreateCanvas(int width, int height); +lwCanvas *lwCreateCanvas(int width, int height, int bgcolor); void lwFreeCanvas(lwCanvas *canvas); void lwDrawPixel(lwCanvas *canvas, int x, int y, int color); int lwGetPixel(lwCanvas *canvas, int x, int y); diff --git a/src/lolwut5.c b/src/lolwut5.c index 4e9828145..5a9348800 100644 --- a/src/lolwut5.c +++ b/src/lolwut5.c @@ -74,7 +74,7 @@ lwCanvas *lwDrawSchotter(int console_cols, int squares_per_row, int squares_per_ int padding = canvas_width > 4 ? 2 : 0; float square_side = (float)(canvas_width-padding*2) / squares_per_row; int canvas_height = square_side * squares_per_col + padding*2; - lwCanvas *canvas = lwCreateCanvas(canvas_width, canvas_height); + lwCanvas *canvas = lwCreateCanvas(canvas_width, canvas_height, 0); for (int y = 0; y < squares_per_col; y++) { for (int x = 0; x < squares_per_row; x++) { @@ -106,7 +106,7 @@ lwCanvas *lwDrawSchotter(int console_cols, int squares_per_row, int squares_per_ * logical canvas. The actual returned string will require a terminal that is * width/2 large and height/4 tall in order to hold the whole image without * overflowing or scrolling, since each Barille character is 2x4. */ -sds lwRenderCanvas(lwCanvas *canvas) { +static sds renderCanvas(lwCanvas *canvas) { sds text = sdsempty(); for (int y = 0; y < canvas->height; y += 4) { for (int x = 0; x < canvas->width; x += 2) { @@ -166,7 +166,7 @@ void lolwut5Command(client *c) { /* Generate some computer art and reply. */ lwCanvas *canvas = lwDrawSchotter(cols,squares_per_row,squares_per_col); - sds rendered = lwRenderCanvas(canvas); + sds rendered = renderCanvas(canvas); rendered = sdscat(rendered, "\nGeorg Nees - schotter, plotter on paper, 1968. Redis ver. "); rendered = sdscat(rendered,REDIS_VERSION); diff --git a/src/lolwut6.c b/src/lolwut6.c index 68f09036a..b76d80690 100644 --- a/src/lolwut6.c +++ b/src/lolwut6.c @@ -31,11 +31,133 @@ * This file implements the LOLWUT command. The command should do something * fun and interesting, and should be replaced by a new implementation at * each new version of Redis. + * + * Thanks to Michele Hiki Falcone for the original image that ispired + * the image, part of his game, Plaguemon. + * + * Thanks to the Shhh computer art collective for the help in tuning the + * output to have a better artistic effect. */ #include "server.h" #include "lolwut.h" +/* Render the canvas using the four gray levels of the standard color + * terminal: they match very well to the grayscale display of the gameboy. */ +static sds renderCanvas(lwCanvas *canvas) { + sds text = sdsempty(); + for (int y = 0; y < canvas->height; y++) { + for (int x = 0; x < canvas->width; x++) { + int color = lwGetPixel(canvas,x,y); + char *ce; /* Color escape sequence. */ + + /* Note that we set both the foreground and background color. + * This way we are able to get a more consistent result among + * different terminals implementations. */ + switch(color) { + case 0: ce = "0;30;40m"; break; /* Black */ + case 1: ce = "0;90;100m"; break; /* Gray 1 */ + case 2: ce = "0;37;47m"; break; /* Gray 2 */ + case 3: ce = "0;97;107m"; break; /* White */ + } + text = sdscatprintf(text,"\033[%s \033[0m",ce); + } + if (y != canvas->height-1) text = sdscatlen(text,"\n",1); + } + return text; +} + +/* Draw a skyscraper on the canvas, according to the parameters in the + * 'skyscraper' structure. Window colors are random and are always one + * of the two grays. */ +struct skyscraper { + int xoff; /* X offset. */ + int width; /* Pixels width. */ + int height; /* Pixels height. */ + int windows; /* Draw windows if true. */ + int color; /* Color of the skyscraper. */ +}; + +void generateSkyscraper(lwCanvas *canvas, struct skyscraper *si) { + int starty = canvas->height-1; + int endy = starty - si->height + 1; + for (int y = starty; y >= endy; y--) { + for (int x = si->xoff; x < si->xoff+si->width; x++) { + /* The roof is four pixels less wide. */ + if (y == endy && (x <= si->xoff+1 || x >= si->xoff+si->width-2)) + continue; + int color = si->color; + /* Alter the color if this is a place where we want to + * draw a window. We check that we are in the inner part of the + * skyscraper, so that windows are far from the borders. */ + if (si->windows && + x > si->xoff+1 && + x < si->xoff+si->width-2 && + y > endy+1 && + y < starty-1) + { + /* Calculate the x,y position relative to the start of + * the window area. */ + int relx = x - (si->xoff+1); + int rely = y - (endy+1); + + /* Note that we want the windows to be two pixels wide + * but just one pixel tall, because terminal "pixels" + * (characters) are not square. */ + if (relx/2 % 2 && rely % 2) { + do { + color = 1 + rand() % 2; + } while (color == si->color); + /* Except we want adjacent pixels creating the same + * window to be the same color. */ + if (relx % 2) color = lwGetPixel(canvas,x-1,y); + } + } + lwDrawPixel(canvas,x,y,color); + } + } +} + +/* Generate a skyline inspired by the parallax backgrounds of 8 bit games. */ +void generateSkyline(lwCanvas *canvas) { + struct skyscraper si; + + /* First draw the background skyscraper without windows, using the + * two different grays. We use two passes to make sure that the lighter + * ones are always in the background. */ + for (int color = 2; color >= 1; color--) { + si.color = color; + for (int offset = -10; offset < canvas->width;) { + offset += rand() % 8; + si.xoff = offset; + si.width = 10 + rand()%9; + if (color == 2) + si.height = canvas->height/2 + rand()%canvas->height/2; + else + si.height = canvas->height/2 + rand()%canvas->height/3; + si.windows = 0; + generateSkyscraper(canvas, &si); + if (color == 2) + offset += si.width/2; + else + offset += si.width+1; + } + } + + /* Now draw the foreground skyscraper with the windows. */ + si.color = 0; + for (int offset = -10; offset < canvas->width;) { + offset += rand() % 8; + si.xoff = offset; + si.width = 5 + rand()%14; + if (si.width % 4) si.width += (si.width % 3); + si.height = canvas->height/3 + rand()%canvas->height/2; + si.windows = 1; + generateSkyscraper(canvas, &si); + offset += si.width+5; + } +} + /* The LOLWUT 6 command: * * LOLWUT [columns] [rows] @@ -45,7 +167,7 @@ */ void lolwut6Command(client *c) { long cols = 80; - long rows = 40; + long rows = 20; /* Parse the optional arguments if any. */ if (c->argc > 1 && @@ -64,12 +186,15 @@ void lolwut6Command(client *c) { if (rows > 1000) rows = 1000; /* Generate the city skyline and reply. */ - sds rendered = sdsempty(); + lwCanvas *canvas = lwCreateCanvas(cols,rows,3); + generateSkyline(canvas); + sds rendered = renderCanvas(canvas); rendered = sdscat(rendered, - "\nDedicated to the 8 bit game developers of the past. Redis ver. "); + "\nDedicated to the 8 bit game developers of past and present.\n" + "Original 8 bit image from Plaguemon by hikikomori. Redis ver. "); rendered = sdscat(rendered,REDIS_VERSION); rendered = sdscatlen(rendered,"\n",1); addReplyVerbatim(c,rendered,sdslen(rendered),"txt"); sdsfree(rendered); - // lwFreeCanvas(canvas); + lwFreeCanvas(canvas); } diff --git a/src/object.c b/src/object.c index 697429b84..70022f897 100644 --- a/src/object.c +++ b/src/object.c @@ -1450,22 +1450,10 @@ NULL addReplyVerbatim(c,report,sdslen(report),"txt"); sdsfree(report); } else if (!strcasecmp(c->argv[1]->ptr,"purge") && c->argc == 2) { -#if defined(USE_JEMALLOC) - char tmp[32]; - unsigned narenas = 0; - size_t sz = sizeof(unsigned); - if (!je_mallctl("arenas.narenas", &narenas, &sz, NULL, 0)) { - sprintf(tmp, "arena.%d.purge", narenas); - if (!je_mallctl(tmp, NULL, 0, NULL, 0)) { - addReply(c, shared.ok); - return; - } - } - addReplyError(c, "Error purging dirty pages"); -#else - addReply(c, shared.ok); - /* Nothing to do for other allocators. */ -#endif + if (jemalloc_purge() == 0) + addReply(c, shared.ok); + else + addReplyError(c, "Error purging dirty pages"); } else { addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)c->argv[1]->ptr); } @@ -1791,7 +1791,8 @@ int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key if (eq && key_len == iter->key_len) return 1; else if (lt) return iter->key_len < key_len; else if (gt) return iter->key_len > key_len; - } if (cmp > 0) { + return 0; + } else if (cmp > 0) { return gt ? 1 : 0; } else /* (cmp < 0) */ { return lt ? 1 : 0; diff --git a/src/replication.c b/src/replication.c index f10a91677..5519b9ce2 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2182,7 +2182,10 @@ void replicationSetMaster(char *ip, int port) { cancelReplicationHandshake(); /* Before destroying our master state, create a cached master using * our own parameters, to later PSYNC with the new master. */ - if (was_master) replicationCacheMasterUsingMyself(); + if (was_master) { + replicationDiscardCachedMaster(); + replicationCacheMasterUsingMyself(); + } server.repl_state = REPL_STATE_CONNECT; } diff --git a/src/server.c b/src/server.c index 593f98f3f..f67175651 100644 --- a/src/server.c +++ b/src/server.c @@ -2260,6 +2260,7 @@ void initServerConfig(void) { server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; server.active_expire_enabled = 1; + server.jemalloc_bg_thread = 1; server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER; @@ -2904,8 +2905,17 @@ void initServer(void) { scriptingInit(1); slowlogInit(); latencyMonitorInit(); +} + +/* Some steps in server initialization need to be done last (after modules + * are loaded). + * Specifically, creation of threads due to a race bug in ld.so, in which + * Thread Local Storage initialization collides with dlopen call. + * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */ +void InitServerLast() { bioInit(); initThreadedIO(); + set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); } @@ -3366,7 +3376,7 @@ int processCommand(client *c) { !c->authenticated; if (auth_required) { /* AUTH and HELLO are valid even in non authenticated state. */ - if (c->cmd->proc != authCommand || c->cmd->proc == helloCommand) { + if (c->cmd->proc != authCommand && c->cmd->proc != helloCommand) { flagTransaction(c); addReply(c,shared.noautherr); return C_OK; @@ -4278,7 +4288,7 @@ sds genRedisInfoString(char *section) { if (server.repl_state != REPL_STATE_CONNECTED) { info = sdscatprintf(info, "master_link_down_since_seconds:%jd\r\n", - (intmax_t)server.unixtime-server.repl_down_since); + (intmax_t)(server.unixtime-server.repl_down_since)); } info = sdscatprintf(info, "slave_priority:%d\r\n" @@ -4719,12 +4729,14 @@ void loadDataFromDisk(void) { (float)(ustime()-start)/1000000); /* Restore the replication ID / offset from the RDB file. */ - if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&& + if ((server.masterhost || + (server.cluster_enabled && + nodeIsSlave(server.cluster->myself))) && rsi.repl_id_is_set && rsi.repl_offset != -1 && /* Note that older implementations may save a repl_stream_db - * of -1 inside the RDB file in a wrong way, see more information - * in function rdbPopulateSaveInfo. */ + * of -1 inside the RDB file in a wrong way, see more + * information in function rdbPopulateSaveInfo. */ rsi.repl_stream_db != -1) { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); @@ -5031,6 +5043,7 @@ int main(int argc, char **argv) { #endif moduleLoadFromQueue(); ACLLoadUsersAtStartup(); + InitServerLast(); loadDataFromDisk(); if (server.cluster_enabled) { if (verifyClusterConfigWithData() == C_ERR) { @@ -5045,6 +5058,7 @@ int main(int argc, char **argv) { if (server.sofd > 0) serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); } else { + InitServerLast(); sentinelIsRunning(); } diff --git a/src/server.h b/src/server.h index 2e5749907..a14989237 100644 --- a/src/server.h +++ b/src/server.h @@ -1174,6 +1174,7 @@ struct redisServer { int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ int active_defrag_enabled; + int jemalloc_bg_thread; /* Enable jemalloc background thread */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */ int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ diff --git a/src/stream.h b/src/stream.h index 8ae90ce77..1163b3527 100644 --- a/src/stream.h +++ b/src/stream.h @@ -88,7 +88,7 @@ typedef struct streamNACK { /* Stream propagation informations, passed to functions in order to propagate * XCLAIM commands to AOF and slaves. */ -typedef struct sreamPropInfo { +typedef struct streamPropInfo { robj *keyname; robj *groupname; } streamPropInfo; diff --git a/src/t_stream.c b/src/t_stream.c index 9e7d3d126..ea9a620f1 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -242,17 +242,17 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ * the current node is full. */ if (lp != NULL) { if (server.stream_node_max_bytes && - lp_bytes > server.stream_node_max_bytes) + lp_bytes >= server.stream_node_max_bytes) { lp = NULL; } else if (server.stream_node_max_entries) { int64_t count = lpGetInteger(lpFirst(lp)); - if (count > server.stream_node_max_entries) lp = NULL; + if (count >= server.stream_node_max_entries) lp = NULL; } } int flags = STREAM_ITEM_FLAG_NONE; - if (lp == NULL || lp_bytes > server.stream_node_max_bytes) { + if (lp == NULL || lp_bytes >= server.stream_node_max_bytes) { master_id = id; streamEncodeID(rax_key,&id); /* Create the listpack having the master entry ID and fields. */ diff --git a/src/zmalloc.c b/src/zmalloc.c index fd8bb6938..e02267fc9 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -326,6 +326,7 @@ size_t zmalloc_get_rss(void) { #endif #if defined(USE_JEMALLOC) + int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident) { @@ -347,13 +348,44 @@ int zmalloc_get_allocator_info(size_t *allocated, je_mallctl("stats.allocated", allocated, &sz, NULL, 0); return 1; } + +void set_jemalloc_bg_thread(int enable) { + /* let jemalloc do purging asynchronously, required when there's no traffic + * after flushdb */ + char val = !!enable; + je_mallctl("background_thread", NULL, 0, &val, 1); +} + +int jemalloc_purge() { + /* return all unused (reserved) pages to the OS */ + char tmp[32]; + unsigned narenas = 0; + size_t sz = sizeof(unsigned); + if (!je_mallctl("arenas.narenas", &narenas, &sz, NULL, 0)) { + sprintf(tmp, "arena.%d.purge", narenas); + if (!je_mallctl(tmp, NULL, 0, NULL, 0)) + return 0; + } + return -1; +} + #else + int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident) { *allocated = *resident = *active = 0; return 1; } + +void set_jemalloc_bg_thread(int enable) { + ((void)(enable)); +} + +int jemalloc_purge() { + return 0; +} + #endif /* Get the sum of the specified field (converted form kb to bytes) in diff --git a/src/zmalloc.h b/src/zmalloc.h index 6fb19b046..b136a910d 100644 --- a/src/zmalloc.h +++ b/src/zmalloc.h @@ -86,6 +86,8 @@ size_t zmalloc_used_memory(void); void zmalloc_set_oom_handler(void (*oom_handler)(size_t)); size_t zmalloc_get_rss(void); int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident); +void set_jemalloc_bg_thread(int enable); +int jemalloc_purge(); size_t zmalloc_get_private_dirty(long pid); size_t zmalloc_get_smap_bytes_by_field(char *field, long pid); size_t zmalloc_get_memory_size(void); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 5d32555b0..1c18582c5 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -319,7 +319,7 @@ start_server {tags {"repl"}} { } } -test {slave fails full sync and diskless load swapdb recoveres it} { +test {slave fails full sync and diskless load swapdb recovers it} { start_server {tags {"repl"}} { set slave [srv 0 client] set slave_host [srv 0 host] diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl index 49e421ee9..76b9bda38 100644 --- a/tests/unit/geo.tcl +++ b/tests/unit/geo.tcl @@ -129,7 +129,7 @@ start_server {tags {"geo"}} { r del points r geoadd points -5.6 42.6 test lindex [r geohash points test] 0 - } {ezs42e44yx0} + } {ezs42e44yx} test {GEOPOS simple} { r del points |