summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/config.c5
-rw-r--r--src/defrag.c210
-rw-r--r--src/rax.c7
-rw-r--r--src/rax.h10
-rw-r--r--tests/integration/rdb.tcl2
-rw-r--r--tests/unit/memefficiency.tcl21
6 files changed, 230 insertions, 25 deletions
diff --git a/src/config.c b/src/config.c
index c7fc11556..c39b61e6c 100644
--- a/src/config.c
+++ b/src/config.c
@@ -431,6 +431,11 @@ void loadServerConfigFromString(char *config) {
if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
+ if (server.active_defrag_enabled) {
+#ifndef HAVE_DEFRAG
+ err = "active defrag can't be enabled without proper jemalloc support"; goto loaderr;
+#endif
+ }
} else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
if ((server.daemonize = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
diff --git a/src/defrag.c b/src/defrag.c
index aae72adcb..b25fceb1e 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -592,6 +592,171 @@ long defragSet(redisDb *db, dictEntry *kde) {
return defragged;
}
+/* Defrag callback for radix tree iterator, called for each node,
+ * used in order to defrag the nodes allocations. */
+int defragRaxNode(raxNode **noderef) {
+ raxNode *newnode = activeDefragAlloc(*noderef);
+ if (newnode) {
+ *noderef = newnode;
+ return 1;
+ }
+ return 0;
+}
+
+/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
+int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
+ static unsigned char last[sizeof(streamID)];
+ raxIterator ri;
+ long iterations = 0;
+ if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) {
+ *cursor = 0;
+ return 0;
+ }
+
+ stream *s = ob->ptr;
+ raxStart(&ri,s->rax);
+ if (*cursor == 0) {
+ /* if cursor is 0, we start new iteration */
+ defragRaxNode(&s->rax->head);
+ /* assign the iterator node callback before the seek, so that the
+ * initial nodes that are processed till the first item are covered */
+ ri.node_cb = defragRaxNode;
+ raxSeek(&ri,"^",NULL,0);
+ } else {
+ /* if cursor is non-zero, we seek to the static 'last' */
+ if (!raxSeek(&ri,">", last, sizeof(last))) {
+ *cursor = 0;
+ return 0;
+ }
+ /* assign the iterator node callback after the seek, so that the
+ * initial nodes that are processed till now aren't covered */
+ ri.node_cb = defragRaxNode;
+ }
+
+ (*cursor)++;
+ while (raxNext(&ri)) {
+ void *newdata = activeDefragAlloc(ri.data);
+ if (newdata)
+ raxSetData(ri.node, ri.data=newdata), (*defragged)++;
+ if (++iterations > 16) {
+ if (ustime() > endtime) {
+ serverAssert(ri.key_len==sizeof(last));
+ memcpy(last,ri.key,ri.key_len);
+ raxStop(&ri);
+ return 1;
+ }
+ iterations = 0;
+ }
+ }
+ raxStop(&ri);
+ *cursor = 0;
+ return 0;
+}
+
+/* optional callback used defrag each rax element (not including the element pointer itself) */
+typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged);
+
+/* defrag radix tree including:
+ * 1) rax struct
+ * 2) rax nodes
+ * 3) rax entry data (only if defrag_data is specified)
+ * 4) call a callback per element, and allow the callback to return a new pointer for the element */
+long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
+ long defragged = 0;
+ raxIterator ri;
+ rax* rax;
+ if ((rax = activeDefragAlloc(*raxref)))
+ defragged++, *raxref = rax;
+ rax = *raxref;
+ raxStart(&ri,rax);
+ ri.node_cb = defragRaxNode;
+ defragRaxNode(&rax->head);
+ raxSeek(&ri,"^",NULL,0);
+ while (raxNext(&ri)) {
+ void *newdata = NULL;
+ if (element_cb)
+ newdata = element_cb(&ri, element_cb_data, &defragged);
+ if (defrag_data && !newdata)
+ newdata = activeDefragAlloc(ri.data);
+ if (newdata)
+ raxSetData(ri.node, ri.data=newdata), defragged++;
+ }
+ raxStop(&ri);
+ return defragged;
+}
+
+typedef struct {
+ streamCG *cg;
+ streamConsumer *c;
+} PendingEntryContext;
+
+void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) {
+ UNUSED(defragged);
+ PendingEntryContext *ctx = privdata;
+ streamNACK *nack = ri->data, *newnack;
+ nack->consumer = ctx->c; /* update nack pointer to consumer */
+ newnack = activeDefragAlloc(nack);
+ if (newnack) {
+ /* update consumer group pointer to the nack */
+ void *prev;
+ raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
+ serverAssert(prev==nack);
+ /* note: we don't increment 'defragged' that's done by the caller */
+ }
+ return newnack;
+}
+
+void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) {
+ streamConsumer *c = ri->data;
+ streamCG *cg = privdata;
+ void *newc = activeDefragAlloc(c);
+ if (newc) {
+ /* note: we don't increment 'defragged' that's done by the caller */
+ c = newc;
+ }
+ sds newsds = activeDefragSds(c->name);
+ if (newsds)
+ (*defragged)++, c->name = newsds;
+ if (c->pel) {
+ PendingEntryContext pel_ctx = {cg, c};
+ *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
+ }
+ return newc; /* returns NULL if c was not defragged */
+}
+
+void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) {
+ streamCG *cg = ri->data;
+ UNUSED(privdata);
+ if (cg->consumers)
+ *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
+ if (cg->pel)
+ *defragged += defragRadixTree(&cg->pel, 0, NULL, NULL);
+ return NULL;
+}
+
+long defragStream(redisDb *db, dictEntry *kde) {
+ long defragged = 0;
+ robj *ob = dictGetVal(kde);
+ serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
+ stream *s = ob->ptr, *news;
+
+ /* handle the main struct */
+ if ((news = activeDefragAlloc(s)))
+ defragged++, ob->ptr = s = news;
+
+ if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
+ rax *newrax = activeDefragAlloc(s->rax);
+ if (newrax)
+ defragged++, s->rax = newrax;
+ defragLater(db, kde);
+ } else
+ defragged += defragRadixTree(&s->rax, 1, NULL, NULL);
+
+ if (s->cgroups)
+ defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
+ return defragged;
+}
+
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. Returns a stat of how many pointers were
* moved. */
@@ -660,6 +825,8 @@ long defragKey(redisDb *db, dictEntry *de) {
} else {
serverPanic("Unknown hash encoding");
}
+ } else if (ob->type == OBJ_STREAM) {
+ defragged += defragStream(db, de);
} else if (ob->type == OBJ_MODULE) {
/* Currently defragmenting modules private data types
* is not supported. */
@@ -680,7 +847,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) {
server.stat_active_defrag_scanned++;
}
-/* Defrag scan callback for for each hash table bicket,
+/* Defrag scan callback for each hash table bicket,
* used in order to defrag the dictEntry allocations. */
void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
@@ -728,27 +895,29 @@ long defragOtherGlobals() {
return defragged;
}
-unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) {
- long defragged = 0;
+/* returns 0 more work may or may not be needed (see non-zero cursor),
+ * and 1 if time is up and more work is needed. */
+int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
- defragged += scanLaterList(ob);
- cursor = 0; /* list has no scan, we must finish it in one go */
+ server.stat_active_defrag_hits += scanLaterList(ob);
+ *cursor = 0; /* list has no scan, we must finish it in one go */
} else if (ob->type == OBJ_SET) {
- defragged += scanLaterSet(ob, &cursor);
+ server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET) {
- defragged += scanLaterZset(ob, &cursor);
+ server.stat_active_defrag_hits += scanLaterZset(ob, cursor);
} else if (ob->type == OBJ_HASH) {
- defragged += scanLaterHash(ob, &cursor);
+ server.stat_active_defrag_hits += scanLaterHash(ob, cursor);
+ } else if (ob->type == OBJ_STREAM) {
+ return scanLaterStraemListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
} else {
- cursor = 0; /* object type may have changed since we schedule it for later */
+ *cursor = 0; /* object type may have changed since we schedule it for later */
}
} else {
- cursor = 0; /* object may have been deleted already */
+ *cursor = 0; /* object may have been deleted already */
}
- server.stat_active_defrag_hits += defragged;
- return cursor;
+ return 0;
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
@@ -788,17 +957,22 @@ int defragLaterStep(redisDb *db, long long endtime) {
dictEntry *de = dictFind(db->dict, current_key);
key_defragged = server.stat_active_defrag_hits;
do {
- cursor = defragLaterItem(de, cursor);
+ int quit = 0;
+ if (defragLaterItem(de, &cursor, endtime))
+ quit = 1; /* time is up, we didn't finish all the work */
+
+ /* Don't start a new BIG key in this loop, this is because the
+ * next key can be a list, and scanLaterList must be done in once cycle */
+ if (!cursor)
+ quit = 1;
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
* (if we have a lot of pointers in one hash bucket, or rehashing),
- * check if we reached the time limit.
- * But regardless, don't start a new BIG key in this loop, this is because the
- * next key can be a list, and scanLaterList must be done in once cycle */
- if (!cursor || (++iterations > 16 ||
+ * check if we reached the time limit. */
+ if (quit || (++iterations > 16 ||
server.stat_active_defrag_hits - prev_defragged > 512 ||
server.stat_active_defrag_scanned - prev_scanned > 64)) {
- if (!cursor || ustime() > endtime) {
+ if (quit || ustime() > endtime) {
if(key_defragged != server.stat_active_defrag_hits)
server.stat_active_defrag_key_hits++;
else
diff --git a/src/rax.c b/src/rax.c
index 8764dd8c9..fff580272 100644
--- a/src/rax.c
+++ b/src/rax.c
@@ -1167,6 +1167,7 @@ void raxStart(raxIterator *it, rax *rt) {
it->key = it->key_static_string;
it->key_max = RAX_ITER_STATIC_LEN;
it->data = NULL;
+ it->node_cb = NULL;
raxStackInit(&it->stack);
}
@@ -1240,6 +1241,8 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
if (!raxIteratorAddChars(it,it->node->data,
it->node->iscompr ? it->node->size : 1)) return 0;
memcpy(&it->node,cp,sizeof(it->node));
+ if (it->node_cb && it->node_cb(&it->node))
+ memcpy(cp,&it->node,sizeof(it->node));
/* For "next" step, stop every time we find a key along the
* way, since the key is lexicograhically smaller compared to
* what follows in the sub-children. */
@@ -1292,6 +1295,8 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
raxIteratorAddChars(it,it->node->data+i,1);
if (!raxStackPush(&it->stack,it->node)) return 0;
memcpy(&it->node,cp,sizeof(it->node));
+ if (it->node_cb && it->node_cb(&it->node))
+ memcpy(cp,&it->node,sizeof(it->node));
if (it->node->iskey) {
it->data = raxGetData(it->node);
return 1;
@@ -1325,7 +1330,7 @@ int raxSeekGreatest(raxIterator *it) {
/* Like raxIteratorNextStep() but implements an iteration step moving
* to the lexicographically previous element. The 'noup' option has a similar
- * effect to the one of raxIteratorPrevSte(). */
+ * effect to the one of raxIteratorNextStep(). */
int raxIteratorPrevStep(raxIterator *it, int noup) {
if (it->flags & RAX_ITER_EOF) {
return 1;
diff --git a/src/rax.h b/src/rax.h
index 5b4d45167..9e6bc0b51 100644
--- a/src/rax.h
+++ b/src/rax.h
@@ -119,6 +119,12 @@ typedef struct raxStack {
int oom; /* True if pushing into this stack failed for OOM at some point. */
} raxStack;
+/* Optional callback used for iterators and be notified on each rax node.
+ * This is used by active defrag, the return value is an indication that
+ * the noderef was chagned, and the tree needs to be updated.
+ * This is currently only supported in forward iterations (raxNext) */
+typedef int (*raxNodeCallback)(raxNode **noderef);
+
/* Radix tree iterator state is encapsulated into this data structure. */
#define RAX_ITER_STATIC_LEN 128
#define RAX_ITER_JUST_SEEKED (1<<0) /* Iterator was just seeked. Return current
@@ -137,6 +143,7 @@ typedef struct raxIterator {
unsigned char key_static_string[RAX_ITER_STATIC_LEN];
raxNode *node; /* Current node. Only for unsafe iteration. */
raxStack stack; /* Stack used for unsafe iteration. */
+ raxNodeCallback node_cb;
} raxIterator;
/* A special pointer returned for not found items. */
@@ -161,4 +168,7 @@ int raxEOF(raxIterator *it);
void raxShow(rax *rax);
uint64_t raxSize(rax *rax);
+/* internals */
+void raxSetData(raxNode *n, void *data);
+
#endif
diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl
index d645b2b54..f51a3c13d 100644
--- a/tests/integration/rdb.tcl
+++ b/tests/integration/rdb.tcl
@@ -48,6 +48,8 @@ start_server [list overrides [list "dir" $server_path]] {
r xadd stream * bar $j
}
}
+ r xgroup create stream mygroup $
+ r xreadgroup GROUP mygroup Alice COUNT 1 STREAMS stream >
set digest [r debug digest]
r debug reload
set newdigest [r debug digest]
diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl
index 1796d695c..ed37a68ed 100644
--- a/tests/unit/memefficiency.tcl
+++ b/tests/unit/memefficiency.tcl
@@ -97,10 +97,15 @@ start_server {tags {"defrag"}} {
r config set active-defrag-ignore-bytes 2mb
r config set maxmemory 0
r config set list-max-ziplist-size 5 ;# list of 10k items will have 2000 quicklist nodes
+ r config set stream-node-max-entries 5
r hmset hash h1 v1 h2 v2 h3 v3
r lpush list a b c d
r zadd zset 0 a 1 b 2 c 3 d
r sadd set a b c d
+ r xadd stream * item 1 value a
+ r xadd stream * item 2 value b
+ r xgroup create stream mygroup $
+ r xreadgroup GROUP mygroup Alice COUNT 1 STREAMS stream >
# create big keys with 10k items
set rd [redis_deferring_client]
@@ -109,8 +114,9 @@ start_server {tags {"defrag"}} {
$rd lpush biglist [concat "asdfasdfasdf" $j]
$rd zadd bigzset $j [concat "asdfasdfasdf" $j]
$rd sadd bigset [concat "asdfasdfasdf" $j]
+ $rd xadd bigstream * item 1 value a
}
- for {set j 0} {$j < 40000} {incr j} {
+ for {set j 0} {$j < 50000} {incr j} {
$rd read ; # Discard replies
}
@@ -134,7 +140,7 @@ start_server {tags {"defrag"}} {
for {set j 0} {$j < 500000} {incr j} {
$rd read ; # Discard replies
}
- assert {[r dbsize] == 500008}
+ assert {[r dbsize] == 500010}
# create some fragmentation
for {set j 0} {$j < 500000} {incr j 2} {
@@ -143,7 +149,7 @@ start_server {tags {"defrag"}} {
for {set j 0} {$j < 500000} {incr j 2} {
$rd read ; # Discard replies
}
- assert {[r dbsize] == 250008}
+ assert {[r dbsize] == 250010}
# start defrag
after 120 ;# serverCron only updates the info once in 100ms
@@ -155,6 +161,7 @@ start_server {tags {"defrag"}} {
r config set latency-monitor-threshold 5
r latency reset
+ set digest [r debug digest]
catch {r config set activedefrag yes} e
if {![string match {DISABLED*} $e]} {
# wait for the active defrag to start working (decision once a second)
@@ -193,9 +200,11 @@ start_server {tags {"defrag"}} {
# due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 75ms
assert {$max_latency <= 80}
- } else {
- set _ ""
}
- } {}
+ # verify the data isn't corrupted or changed
+ set newdigest [r debug digest]
+ assert {$digest eq $newdigest}
+ r save ;# saving an rdb iterates over all the data / pointers
+ } {OK}
}
}