summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf7
-rw-r--r--src/config.c6
-rw-r--r--src/evict.c44
-rw-r--r--src/lazyfree.c25
-rw-r--r--src/multi.c2
-rw-r--r--src/networking.c217
-rw-r--r--src/object.c30
-rw-r--r--src/replication.c520
-rw-r--r--src/server.c52
-rw-r--r--src/server.h81
-rw-r--r--tests/integration/psync2-master-restart.tcl5
-rw-r--r--tests/integration/replication-buffer.tcl218
-rw-r--r--tests/integration/replication.tcl7
-rw-r--r--tests/test_helper.tcl1
-rw-r--r--tests/unit/maxmemory.tcl8
-rw-r--r--tests/unit/moduleapi/testrdb.tcl7
16 files changed, 934 insertions, 296 deletions
diff --git a/redis.conf b/redis.conf
index 108b9ff63..368cf3a73 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1836,6 +1836,13 @@ activerehashing yes
# Instead there is a default limit for pubsub and replica clients, since
# subscribers and replicas receive data in a push fashion.
#
+# Note that it doesn't make sense to set the replica clients output buffer
+# limit lower than the repl-backlog-size config (partial sync will succeed
+# and then replica will get disconnected).
+# Such a configuration is ignored (the size of repl-backlog-size will be used).
+# This doesn't have memory consumption implications since the replica client
+# will share the backlog buffers memory.
+#
# Both the hard or the soft limit can be disabled by setting them to zero.
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit replica 256mb 64mb 60
diff --git a/src/config.c b/src/config.c
index c044ca516..909929147 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2399,10 +2399,10 @@ static int updateJemallocBgThread(int val, int prev, const char **err) {
}
static int updateReplBacklogSize(long long val, long long prev, const char **err) {
- /* resizeReplicationBacklog sets server.cfg_repl_backlog_size, and relies on
+ /* resizeReplicationBacklog sets server.repl_backlog_size, and relies on
* being able to tell when the size changes, so restore prev before calling it. */
UNUSED(err);
- server.cfg_repl_backlog_size = prev;
+ server.repl_backlog_size = prev;
resizeReplicationBacklog(val);
return 1;
}
@@ -2684,7 +2684,7 @@ standardConfig configs[] = {
createLongLongConfig("latency-monitor-threshold", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.latency_monitor_threshold, 0, INTEGER_CONFIG, NULL, NULL),
createLongLongConfig("proto-max-bulk-len", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
- createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.cfg_repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
+ createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
/* Unsigned Long Long configs */
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
diff --git a/src/evict.c b/src/evict.c
index 954f6f402..4186378a2 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -325,22 +325,44 @@ unsigned long LFUDecrAndReturn(robj *o) {
}
/* We don't want to count AOF buffers and slaves output buffers as
- * used memory: the eviction should use mostly data size. This function
- * returns the sum of AOF and slaves buffer. */
+ * used memory: the eviction should use mostly data size, because
+ * it can cause feedback-loop when we push DELs into them, putting
+ * more and more DELs will make them bigger, if we count them, we
+ * need to evict more keys, and then generate more DELs, maybe cause
+ * massive eviction loop, even all keys are evicted.
+ *
+ * This function returns the sum of AOF and replication buffer. */
size_t freeMemoryGetNotCountedMemory(void) {
size_t overhead = 0;
- int slaves = listLength(server.slaves);
- if (slaves) {
- listIter li;
- listNode *ln;
-
- listRewind(server.slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = listNodeValue(ln);
- overhead += getClientOutputBufferMemoryUsage(slave);
+ /* Since all replicas and replication backlog share global replication
+ * buffer, we think only the part of exceeding backlog size is the extra
+ * separate consumption of replicas.
+ *
+ * Note that although the backlog is also initially incrementally grown
+ * (pushing DELs consumes memory), it'll eventually stop growing and
+ * remain constant in size, so even if its creation will cause some
+ * eviction, it's capped, and also here to stay (no resonance effect)
+ *
+ * Note that, because we trim backlog incrementally in the background,
+ * backlog size may exceeds our setting if slow replicas that reference
+ * vast replication buffer blocks disconnect. To avoid massive eviction
+ * loop, we don't count the delayed freed replication backlog into used
+ * memory even if there are no replicas, i.e. we still regard this memory
+ * as replicas'. */
+ if ((long long)server.repl_buffer_mem > server.repl_backlog_size) {
+ /* We use list structure to manage replication buffer blocks, so backlog
+ * also occupies some extra memory, we can't know exact blocks numbers,
+ * we only get approximate size according to per block size. */
+ size_t extra_approx_size =
+ (server.repl_backlog_size/PROTO_REPLY_CHUNK_BYTES + 1) *
+ (sizeof(replBufBlock)+sizeof(listNode));
+ size_t counted_mem = server.repl_backlog_size + extra_approx_size;
+ if (server.repl_buffer_mem > counted_mem) {
+ overhead += (server.repl_buffer_mem - counted_mem);
}
}
+
if (server.aof_state != AOF_OFF) {
overhead += sdsAllocSize(server.aof_buf)+aofRewriteBufferMemoryUsage();
}
diff --git a/src/lazyfree.c b/src/lazyfree.c
index 10f1ab39f..6127abe77 100644
--- a/src/lazyfree.c
+++ b/src/lazyfree.c
@@ -46,6 +46,18 @@ void lazyFreeLuaScripts(void *args[]) {
atomicIncr(lazyfreed_objects,len);
}
+/* Release replication backlog referencing memory. */
+void lazyFreeReplicationBacklogRefMem(void *args[]) {
+ list *blocks = args[0];
+ rax *index = args[1];
+ long long len = listLength(blocks);
+ len += raxSize(index);
+ listRelease(blocks);
+ raxFree(index);
+ atomicDecr(lazyfree_objects,len);
+ atomicIncr(lazyfreed_objects,len);
+}
+
/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {
size_t aux;
@@ -180,3 +192,16 @@ void freeLuaScriptsAsync(dict *lua_scripts) {
dictRelease(lua_scripts);
}
}
+
+/* Free replication backlog referencing buffer blocks and rax index. */
+void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
+ if (listLength(blocks) > LAZYFREE_THRESHOLD ||
+ raxSize(index) > LAZYFREE_THRESHOLD)
+ {
+ atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index));
+ bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
+ } else {
+ listRelease(blocks);
+ raxFree(index);
+ }
+}
diff --git a/src/multi.c b/src/multi.c
index 5cc846637..e1a1df93f 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -276,7 +276,7 @@ void execCommand(client *c) {
* backlog with the final EXEC. */
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
- feedReplicationBacklog(execcmd,strlen(execcmd));
+ feedReplicationBuffer(execcmd,strlen(execcmd));
}
afterPropagateExec();
}
diff --git a/src/networking.c b/src/networking.c
index d514c11a5..9bb12787b 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -140,6 +140,8 @@ client *createClient(connection *conn) {
c->name = NULL;
c->bufpos = 0;
c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
+ c->ref_repl_buf_node = NULL;
+ c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
@@ -467,7 +469,7 @@ void afterErrorReply(client *c, const char *s, size_t len) {
"to its %s: '%.*s' after processing the command "
"'%s'", from, to, (int)len, s, cmdname);
if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
- server.repl_backlog_histlen > 0)
+ server.repl_backlog->histlen > 0)
{
showLatestBacklog();
}
@@ -985,30 +987,37 @@ void AddReplyFromClient(client *dst, client *src) {
closeClientOnOutputBufferLimitReached(dst, 1);
}
-/* Copy 'src' client output buffers into 'dst' client output buffers.
- * The function takes care of freeing the old output buffers of the
- * destination client. */
-void copyClientOutputBuffer(client *dst, client *src) {
- listEmpty(dst->reply);
- dst->sentlen = 0;
- dst->bufpos = 0;
- dst->reply_bytes = 0;
+/* Logically copy 'src' replica client buffers info to 'dst' replica.
+ * Basically increase referenced buffer block node reference count. */
+void copyReplicaOutputBuffer(client *dst, client *src) {
+ serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
- /* First copy src static buffer into dst (either static buffer or reply
- * list, maybe clients have different 'usable_buffer_size'). */
- _addReplyToBufferOrList(dst,src->buf,src->bufpos);
-
- /* Copy src reply list into the dest. */
- list* reply = listDup(src->reply);
- listJoin(dst->reply,reply);
- dst->reply_bytes += src->reply_bytes;
- listRelease(reply);
+ if (src->ref_repl_buf_node == NULL) return;
+ dst->ref_repl_buf_node = src->ref_repl_buf_node;
+ dst->ref_block_pos = src->ref_block_pos;
+ ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++;
}
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
- return c->bufpos || listLength(c->reply);
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ /* Replicas use global shared replication buffer instead of
+ * private output buffer. */
+ serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
+ if (c->ref_repl_buf_node == NULL) return 0;
+
+ /* If the last replication buffer block content is totally sent,
+ * we have nothing to send. */
+ listNode *ln = listLast(server.repl_buffer_blocks);
+ replBufBlock *tail = listNodeValue(ln);
+ if (ln == c->ref_repl_buf_node &&
+ c->ref_block_pos == tail->used) return 0;
+
+ return 1;
+ } else {
+ return c->bufpos || listLength(c->reply);
+ }
}
void clientAcceptHandler(connection *conn) {
@@ -1395,6 +1404,7 @@ void freeClient(client *c) {
/* Free data structures. */
listRelease(c->reply);
+ freeReplicaReferencedReplBuffer(c);
freeClientArgv(c);
freeClientOriginalArgv(c);
@@ -1542,6 +1552,77 @@ client *lookupClientByID(uint64_t id) {
return (c == raxNotFound) ? NULL : c;
}
+/* This function does actual writing output buffers to different types of
+ * clients, it is called by writeToClient.
+ * If we write successfully, it return C_OK, otherwise, C_ERR is returned,
+ * And 'nwritten' is a output parameter, it means how many bytes server write
+ * to client. */
+int _writeToClient(client *c, ssize_t *nwritten) {
+ *nwritten = 0;
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
+
+ replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
+ serverAssert(o->used >= c->ref_block_pos);
+ /* Send current block if it is not fully sent. */
+ if (o->used > c->ref_block_pos) {
+ *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
+ o->used-c->ref_block_pos);
+ if (*nwritten <= 0) return C_ERR;
+ c->ref_block_pos += *nwritten;
+ }
+
+ /* If we fully sent the object on head, go to the next one. */
+ listNode *next = listNextNode(c->ref_repl_buf_node);
+ if (next && c->ref_block_pos == o->used) {
+ o->refcount--;
+ ((replBufBlock *)(listNodeValue(next)))->refcount++;
+ c->ref_repl_buf_node = next;
+ c->ref_block_pos = 0;
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
+ }
+ return C_OK;
+ }
+
+ if (c->bufpos > 0) {
+ *nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
+ if (*nwritten <= 0) return C_ERR;
+ c->sentlen += *nwritten;
+
+ /* If the buffer was sent, set bufpos to zero to continue with
+ * the remainder of the reply. */
+ if ((int)c->sentlen == c->bufpos) {
+ c->bufpos = 0;
+ c->sentlen = 0;
+ }
+ } else {
+ clientReplyBlock *o = listNodeValue(listFirst(c->reply));
+ size_t objlen = o->used;
+
+ if (objlen == 0) {
+ c->reply_bytes -= o->size;
+ listDelNode(c->reply,listFirst(c->reply));
+ return C_OK;
+ }
+
+ *nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
+ if (*nwritten <= 0) return C_ERR;
+ c->sentlen += *nwritten;
+
+ /* If we fully sent the object on head go to the next one */
+ if (c->sentlen == objlen) {
+ c->reply_bytes -= o->size;
+ listDelNode(c->reply,listFirst(c->reply));
+ c->sentlen = 0;
+ /* If there are no longer objects in the list, we expect
+ * the count of reply bytes to be exactly zero. */
+ if (listLength(c->reply) == 0)
+ serverAssert(c->reply_bytes == 0);
+ }
+ }
+ return C_OK;
+}
+
/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed because of some
* error. If handler_installed is set, it will attempt to clear the
@@ -1555,48 +1636,11 @@ int writeToClient(client *c, int handler_installed) {
atomicIncr(server.stat_total_writes_processed, 1);
ssize_t nwritten = 0, totwritten = 0;
- size_t objlen;
- clientReplyBlock *o;
while(clientHasPendingReplies(c)) {
- if (c->bufpos > 0) {
- nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
- if (nwritten <= 0) break;
- c->sentlen += nwritten;
- totwritten += nwritten;
-
- /* If the buffer was sent, set bufpos to zero to continue with
- * the remainder of the reply. */
- if ((int)c->sentlen == c->bufpos) {
- c->bufpos = 0;
- c->sentlen = 0;
- }
- } else {
- o = listNodeValue(listFirst(c->reply));
- objlen = o->used;
-
- if (objlen == 0) {
- c->reply_bytes -= o->size;
- listDelNode(c->reply,listFirst(c->reply));
- continue;
- }
-
- nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
- if (nwritten <= 0) break;
- c->sentlen += nwritten;
- totwritten += nwritten;
-
- /* If we fully sent the object on head go to the next one */
- if (c->sentlen == objlen) {
- c->reply_bytes -= o->size;
- listDelNode(c->reply,listFirst(c->reply));
- c->sentlen = 0;
- /* If there are no longer objects in the list, we expect
- * the count of reply bytes to be exactly zero. */
- if (listLength(c->reply) == 0)
- serverAssert(c->reply_bytes == 0);
- }
- }
+ int ret = _writeToClient(c, &nwritten);
+ if (ret == C_ERR) break;
+ totwritten += nwritten;
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
@@ -2077,8 +2121,7 @@ void commandProcessed(client *c) {
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
if (applied) {
- replicationFeedSlavesFromMasterStream(server.slaves,
- c->pending_querybuf, applied);
+ replicationFeedStreamFromMasterStream(c->pending_querybuf,applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
@@ -2399,6 +2442,13 @@ sds catClientInfoString(sds s, client *client) {
/* Compute the total memory consumed by this client. */
size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
+ size_t used_blocks_of_repl_buf = 0;
+ if (client->ref_repl_buf_node) {
+ replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
+ replBufBlock *cur = listNodeValue(client->ref_repl_buf_node);
+ used_blocks_of_repl_buf = last->id - cur->id + 1;
+ }
+
sds cmdname = client->lastcmd ? getFullCommandName(client->lastcmd) : NULL;
sds ret = sdscatfmt(s,
"id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
@@ -2419,7 +2469,7 @@ sds catClientInfoString(sds s, client *client) {
(unsigned long long) client->argv_len_sum,
(unsigned long long) client->mstate.argv_len_sums,
(unsigned long long) client->bufpos,
- (unsigned long long) listLength(client->reply),
+ (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf,
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
(unsigned long long) total_mem,
events,
@@ -3247,8 +3297,21 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
size_t getClientOutputBufferMemoryUsage(client *c) {
- size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
- return c->reply_bytes + (list_item_size*listLength(c->reply));
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ size_t repl_buf_size = 0;
+ size_t repl_node_num = 0;
+ size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock);
+ if (c->ref_repl_buf_node) {
+ replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
+ replBufBlock *cur = listNodeValue(c->ref_repl_buf_node);
+ repl_buf_size = last->repl_offset + last->size - cur->repl_offset;
+ repl_node_num = last->id - cur->id + 1;
+ }
+ return repl_buf_size + (repl_node_size*repl_node_num);
+ } else {
+ size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
+ return c->reply_bytes + (list_item_size*listLength(c->reply));
+ }
}
/* Returns the total client's memory usage.
@@ -3332,8 +3395,18 @@ int checkClientOutputBufferLimits(client *c) {
* like normal clients. */
if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
+ /* Note that it doesn't make sense to set the replica clients output buffer
+ * limit lower than the repl-backlog-size config (partial sync will succeed
+ * and then replica will get disconnected).
+ * Such a configuration is ignored (the size of repl-backlog-size will be used).
+ * This doesn't have memory consumption implications since the replica client
+ * will share the backlog buffers memory. */
+ size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes;
+ if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes &&
+ (long long)hard_limit_bytes < server.repl_backlog_size)
+ hard_limit_bytes = server.repl_backlog_size;
if (server.client_obuf_limits[class].hard_limit_bytes &&
- used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
+ used_mem >= hard_limit_bytes)
hard = 1;
if (server.client_obuf_limits[class].soft_limit_bytes &&
used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
@@ -3375,7 +3448,10 @@ int checkClientOutputBufferLimits(client *c) {
int closeClientOnOutputBufferLimitReached(client *c, int async) {
if (!c->conn) return 0; /* It is unsafe to free fake clients. */
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
- if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return 0;
+ /* Note that c->reply_bytes is irrelevant for replica clients
+ * (they use the global repl buffers). */
+ if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) ||
+ c->flags & CLIENT_CLOSE_ASAP) return 0;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(),c);
@@ -3740,6 +3816,15 @@ int handleClientsWithPendingWritesUsingThreads(void) {
continue;
}
+ /* Since all replicas and replication backlog use global replication
+ * buffer, to guarantee data accessing thread safe, we must put all
+ * replicas client into io_threads_list[0] i.e. main thread handles
+ * sending the output buffer of all replicas. */
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ listAddNodeTail(io_threads_list[0],c);
+ continue;
+ }
+
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
diff --git a/src/object.c b/src/object.c
index 4c62df3f0..c2d917ae4 100644
--- a/src/object.c
+++ b/src/object.c
@@ -1172,20 +1172,34 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mem_total += server.initial_memory_usage;
- mem = 0;
- if (server.repl_backlog)
- mem += zmalloc_size(server.repl_backlog);
- mh->repl_backlog = mem;
- mem_total += mem;
+ /* Replication backlog and replicas share one global replication buffer,
+ * only if replication buffer memory is more than the repl backlog setting,
+ * we consider the excess as replicas' memory. Otherwise, replication buffer
+ * memory is the consumption of repl backlog. */
+ if (listLength(server.slaves) &&
+ (long long)server.repl_buffer_mem > server.repl_backlog_size)
+ {
+ mh->clients_slaves = server.repl_buffer_mem - server.repl_backlog_size;
+ mh->repl_backlog = server.repl_backlog_size;
+ } else {
+ mh->clients_slaves = 0;
+ mh->repl_backlog = server.repl_buffer_mem;
+ }
+ if (server.repl_backlog) {
+ /* The approximate memory of rax tree for indexed blocks. */
+ mh->repl_backlog +=
+ server.repl_backlog->blocks_index->numnodes * sizeof(raxNode) +
+ raxSize(server.repl_backlog->blocks_index) * sizeof(void*);
+ }
+ mem_total += mh->repl_backlog;
+ mem_total += mh->clients_slaves;
/* Computing the memory used by the clients would be O(N) if done
* here online. We use our values computed incrementally by
* updateClientMemUsage(). */
- mh->clients_slaves = server.stat_clients_type_memory[CLIENT_TYPE_SLAVE];
mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL];
- mem_total += mh->clients_slaves;
mem_total += mh->clients_normal;
mem = 0;
@@ -1312,7 +1326,7 @@ sds getMemoryDoctorReport(void) {
}
/* Slaves using more than 10 MB each? */
- if (numslaves > 0 && mh->clients_slaves / numslaves > (1024*1024*10)) {
+ if (numslaves > 0 && mh->clients_slaves > (1024*1024*10)) {
big_slave_buf = 1;
num_reports++;
}
diff --git a/src/replication.c b/src/replication.c
index 1c8836c02..0629a4ca9 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -33,6 +33,7 @@
#include "cluster.h"
#include "bio.h"
+#include <memory.h>
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
@@ -109,83 +110,94 @@ int bg_unlink(const char *filename) {
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
- server.repl_backlog = zmalloc(server.cfg_repl_backlog_size);
- server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog);
- server.repl_backlog_histlen = 0;
- server.repl_backlog_idx = 0;
-
+ server.repl_backlog = zmalloc(sizeof(replBacklog));
+ server.repl_backlog->ref_repl_buf_node = NULL;
+ server.repl_backlog->unindexed_count = 0;
+ server.repl_backlog->blocks_index = raxNew();
+ server.repl_backlog->histlen = 0;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
- server.repl_backlog_off = server.master_repl_offset+1;
+ server.repl_backlog->offset = server.master_repl_offset+1;
}
/* This function is called when the user modifies the replication backlog
* size at runtime. It is up to the function to both update the
- * server.cfg_repl_backlog_size and to resize the buffer and setup it so that
+ * server.repl_backlog_size and to resize the buffer and setup it so that
* it contains the same data as the previous one (possibly less data, but
* the most recent bytes, or the same data and more free space in case the
* buffer is enlarged). */
void resizeReplicationBacklog(long long newsize) {
if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
- if (server.cfg_repl_backlog_size == newsize) return;
+ if (server.repl_backlog_size == newsize) return;
- server.cfg_repl_backlog_size = newsize;
- if (server.repl_backlog != NULL) {
- /* What we actually do is to flush the old buffer and realloc a new
- * empty one. It will refill with new data incrementally.
- * The reason is that copying a few gigabytes adds latency and even
- * worse often we need to alloc additional space before freeing the
- * old buffer. */
- zfree(server.repl_backlog);
- server.repl_backlog = zmalloc(server.cfg_repl_backlog_size);
- server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog);
- server.repl_backlog_histlen = 0;
- server.repl_backlog_idx = 0;
- /* Next byte we have is... the next since the buffer is empty. */
- server.repl_backlog_off = server.master_repl_offset+1;
- }
+ server.repl_backlog_size = newsize;
+ if (server.repl_backlog)
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
void freeReplicationBacklog(void) {
serverAssert(listLength(server.slaves) == 0);
+ if (server.repl_backlog == NULL) return;
+
+ /* Decrease the start buffer node reference count. */
+ if (server.repl_backlog->ref_repl_buf_node) {
+ replBufBlock *o = listNodeValue(
+ server.repl_backlog->ref_repl_buf_node);
+ serverAssert(o->refcount == 1); /* Last reference. */
+ o->refcount--;
+ }
+
+ /* Replication buffer blocks are completely released when we free the
+ * backlog, since the backlog is released only when there are no replicas
+ * and the backlog keeps the last reference of all blocks. */
+ freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks,
+ server.repl_backlog->blocks_index);
+ resetReplicationBuffer();
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}
-/* Add data to the replication backlog.
- * This function also increments the global replication offset stored at
- * server.master_repl_offset, because there is no case where we want to feed
- * the backlog without incrementing the offset. */
-void feedReplicationBacklog(void *ptr, size_t len) {
- unsigned char *p = ptr;
+void resetReplicationBuffer(void) {
+ server.repl_buffer_mem = 0;
+ server.repl_buffer_blocks = listCreate();
+ listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void*))zfree);
+}
- server.master_repl_offset += len;
+int canFeedReplicaReplBuffer(client *replica) {
+ /* Don't feed replicas that only want the RDB. */
+ if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
- /* This is a circular buffer, so write as much data we can at every
- * iteration and rewind the "idx" index if we reach the limit. */
- while(len) {
- size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
- if (thislen > len) thislen = len;
- memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
- server.repl_backlog_idx += thislen;
- if (server.repl_backlog_idx == server.repl_backlog_size)
- server.repl_backlog_idx = 0;
- len -= thislen;
- p += thislen;
- server.repl_backlog_histlen += thislen;
- }
- if (server.repl_backlog_histlen > server.repl_backlog_size)
- server.repl_backlog_histlen = server.repl_backlog_size;
- /* Set the offset of the first byte we have in the backlog. */
- server.repl_backlog_off = server.master_repl_offset -
- server.repl_backlog_histlen + 1;
+ /* Don't feed replicas that are still waiting for BGSAVE to start. */
+ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
+
+ return 1;
}
-/* Wrapper for feedReplicationBacklog() that takes Redis string objects
+/* Similar with 'prepareClientToWrite', note that we must call this function
+ * before feeding replication stream into global replication buffer, since
+ * clientHasPendingReplies in prepareClientToWrite will access the global
+ * replication buffer to make judgements. */
+int prepareReplicasToWrite(void) {
+ listIter li;
+ listNode *ln;
+ int prepared = 0;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
+ if (prepareClientToWrite(slave) == C_ERR) continue;
+ prepared++;
+ }
+
+ return prepared;
+}
+
+/* Wrapper for feedReplicationBuffer() that takes Redis string objects
* as input. */
-void feedReplicationBacklogWithObject(robj *o) {
+void feedReplicationBufferWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;
@@ -197,27 +209,197 @@ void feedReplicationBacklogWithObject(robj *o) {
len = sdslen(o->ptr);
p = o->ptr;
}
- feedReplicationBacklog(p,len);
+ feedReplicationBuffer(p,len);
}
-int canFeedReplicaReplBuffer(client *replica) {
- /* Don't feed replicas that only want the RDB. */
- if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
+/* Generally, we only have one replication buffer block to trim when replication
+ * backlog size exceeds our setting and no replica reference it. But if replica
+ * clients disconnect, we need to free many replication buffer blocks that are
+ * referenced. It would cost much time if there are a lots blocks to free, that
+ * will freeze server, so we trim replication backlog incrementally. */
+void incrementalTrimReplicationBacklog(size_t max_blocks) {
+ serverAssert(server.repl_backlog != NULL);
- /* Don't feed replicas that are still waiting for BGSAVE to start. */
- if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
+ size_t trimmed_blocks = 0, trimmed_bytes = 0;
+ while (server.repl_backlog->histlen > server.repl_backlog_size &&
+ trimmed_blocks < max_blocks)
+ {
+ /* We never trim backlog to less than one block. */
+ if (listLength(server.repl_buffer_blocks) <= 1) break;
+
+ /* Replicas increment the refcount of the first replication buffer block
+ * they refer to, in that case, we don't trim the backlog even if
+ * backlog_histlen exceeds backlog_size. This implicitly makes backlog
+ * bigger than our setting, but makes the master accept partial resync as
+ * much as possible. So that backlog must be the last reference of
+ * replication buffer blocks. */
+ listNode *first = listFirst(server.repl_buffer_blocks);
+ serverAssert(first == server.repl_backlog->ref_repl_buf_node);
+ replBufBlock *fo = listNodeValue(first);
+ if (fo->refcount != 1) break;
+
+ /* We don't try trim backlog if backlog valid size will be lessen than
+ * setting backlog size once we release the first repl buffer block. */
+ if (server.repl_backlog->histlen - (long long)fo->size <=
+ server.repl_backlog_size) break;
+
+ /* Decr refcount and release the first block later. */
+ fo->refcount--;
+ trimmed_bytes += fo->size;
+ trimmed_blocks++;
+
+ /* Go to use next replication buffer block node. */
+ listNode *next = listNextNode(first);
+ server.repl_backlog->ref_repl_buf_node = next;
+ serverAssert(server.repl_backlog->ref_repl_buf_node != NULL);
+ /* Incr reference count to keep the new head node. */
+ ((replBufBlock *)listNodeValue(next))->refcount++;
+
+ /* Remove the node in recorded blocks. */
+ uint64_t encoded_offset = htonu64(fo->repl_offset);
+ raxRemove(server.repl_backlog->blocks_index,
+ (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL);
+
+ /* Delete the first node from global replication buffer. */
+ serverAssert(fo->refcount == 0 && fo->used == fo->size);
+ server.repl_buffer_mem -= (fo->size +
+ sizeof(listNode) + sizeof(replBufBlock));
+ listDelNode(server.repl_buffer_blocks, first);
+ }
+
+ server.repl_backlog->histlen -= trimmed_bytes;
+ /* Set the offset of the first byte we have in the backlog. */
+ server.repl_backlog->offset = server.master_repl_offset -
+ server.repl_backlog->histlen + 1;
+}
- return 1;
+/* Free replication buffer blocks that are referenced by this client. */
+void freeReplicaReferencedReplBuffer(client *replica) {
+ if (replica->ref_repl_buf_node != NULL) {
+ /* Decrease the start buffer node reference count. */
+ replBufBlock *o = listNodeValue(replica->ref_repl_buf_node);
+ serverAssert(o->refcount > 0);
+ o->refcount--;
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
+ }
+ replica->ref_repl_buf_node = NULL;
+ replica->ref_block_pos = 0;
}
-/* Propagate write commands to slaves, and populate the replication backlog
- * as well. This function is used if the instance is a master: we use
- * the commands received by our clients in order to create the replication
- * stream. Instead if the instance is a slave and has sub-slaves attached,
- * we use replicationFeedSlavesFromMasterStream() */
-void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
- listNode *ln;
+/* Append bytes into the global replication buffer list, replication backlog and
+ * all replica clients use replication buffers collectively, this function replace
+ * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog,
+ * First we add buffer into global replication buffer block list, and then
+ * update replica / replication-backlog referenced node and block position. */
+void feedReplicationBuffer(char *s, size_t len) {
+ static long long repl_block_id = 0;
+
+ if (server.repl_backlog == NULL) return;
+ server.master_repl_offset += len;
+ server.repl_backlog->histlen += len;
+
+ /* Install write handler for all replicas. */
+ prepareReplicasToWrite();
+
+ size_t start_pos = 0; /* The position of referenced blok to start sending. */
+ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
+ int add_new_block = 0; /* Create new block if current block is total used. */
+ listNode *ln = listLast(server.repl_buffer_blocks);
+ replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
+
+ /* Append to tail string when possible. */
+ if (tail && tail->size > tail->used) {
+ start_node = listLast(server.repl_buffer_blocks);
+ start_pos = tail->used;
+ /* Copy the part we can fit into the tail, and leave the rest for a
+ * new node */
+ size_t avail = tail->size - tail->used;
+ size_t copy = (avail >= len) ? len : avail;
+ memcpy(tail->buf + tail->used, s, copy);
+ tail->used += copy;
+ s += copy;
+ len -= copy;
+ }
+ if (len) {
+ /* Create a new node, make sure it is allocated to at
+ * least PROTO_REPLY_CHUNK_BYTES */
+ size_t usable_size;
+ size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len;
+ tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
+ /* Take over the allocation's internal fragmentation */
+ tail->size = usable_size - sizeof(replBufBlock);
+ tail->used = len;
+ tail->refcount = 0;
+ tail->repl_offset = server.master_repl_offset - tail->used + 1;
+ tail->id = repl_block_id++;
+ memcpy(tail->buf, s, len);
+ listAddNodeTail(server.repl_buffer_blocks, tail);
+ /* We also count the list node memory into replication buffer memory. */
+ server.repl_buffer_mem += (usable_size + sizeof(listNode));
+ add_new_block = 1;
+ if (start_node == NULL) {
+ start_node = listLast(server.repl_buffer_blocks);
+ start_pos = 0;
+ }
+ }
+
+ /* For output buffer of replicas. */
listIter li;
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
+
+ /* Update shared replication buffer start position. */
+ if (slave->ref_repl_buf_node == NULL) {
+ slave->ref_repl_buf_node = start_node;
+ slave->ref_block_pos = start_pos;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ }
+
+ /* Check output buffer limit only when add new block. */
+ if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
+ }
+
+ /* For replication backlog */
+ if (server.repl_backlog->ref_repl_buf_node == NULL) {
+ server.repl_backlog->ref_repl_buf_node = start_node;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
+
+ /* Replication buffer must be empty before adding replication stream
+ * into replication backlog. */
+ serverAssert(add_new_block == 1 && start_pos == 0);
+ }
+ if (add_new_block) {
+ /* To make search offset from replication buffer blocks quickly
+ * when replicas ask partial resynchronization, we create one index
+ * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */
+ server.repl_backlog->unindexed_count++;
+ if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) {
+ uint64_t encoded_offset = htonu64(tail->repl_offset);
+ raxInsert(server.repl_backlog->blocks_index,
+ (unsigned char*)&encoded_offset, sizeof(uint64_t),
+ listLast(server.repl_buffer_blocks), NULL);
+ server.repl_backlog->unindexed_count = 0;
+ }
+ }
+ /* Try to trim replication backlog since replication backlog may exceed
+ * our setting when we add replication stream. Note that it is important to
+ * try to trim at least one node since in the common case this is where one
+ * new backlog node is added and one should be removed. See also comments
+ * in freeMemoryGetNotCountedMemory for details. */
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
+}
+
+/* Propagate write commands to replication stream.
+ *
+ * This function is used if the instance is a master: we use the commands
+ * received by our clients in order to create the replication stream.
+ * Instead if the instance is a replica and has sub-replicas attached, we use
+ * replicationFeedStreamFromMasterStream() */
+void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
int j, len;
char llstr[LONG_STR_SIZE];
@@ -252,68 +434,36 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
dictid_len, llstr));
}
- /* Add the SELECT command into the backlog. */
- if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
-
- /* Send it to slaves. */
- listRewind(slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
-
- if (!canFeedReplicaReplBuffer(slave)) continue;
- addReply(slave,selectcmd);
- }
+ feedReplicationBufferWithObject(selectcmd);
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
- /* Write the command to the replication backlog if any. */
- if (server.repl_backlog) {
- char aux[LONG_STR_SIZE+3];
-
- /* Add the multi bulk reply length. */
- aux[0] = '*';
- len = ll2string(aux+1,sizeof(aux)-1,argc);
- aux[len+1] = '\r';
- aux[len+2] = '\n';
- feedReplicationBacklog(aux,len+3);
-
- for (j = 0; j < argc; j++) {
- long objlen = stringObjectLen(argv[j]);
+ /* Write the command to the replication buffer if any. */
+ char aux[LONG_STR_SIZE+3];
- /* We need to feed the buffer with the object as a bulk reply
- * not just as a plain string, so create the $..CRLF payload len
- * and add the final CRLF */
- aux[0] = '$';
- len = ll2string(aux+1,sizeof(aux)-1,objlen);
- aux[len+1] = '\r';
- aux[len+2] = '\n';
- feedReplicationBacklog(aux,len+3);
- feedReplicationBacklogWithObject(argv[j]);
- feedReplicationBacklog(aux+len+1,2);
- }
- }
-
- /* Write the command to every slave. */
- listRewind(slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
+ /* Add the multi bulk reply length. */
+ aux[0] = '*';
+ len = ll2string(aux+1,sizeof(aux)-1,argc);
+ aux[len+1] = '\r';
+ aux[len+2] = '\n';
+ feedReplicationBuffer(aux,len+3);
- if (!canFeedReplicaReplBuffer(slave)) continue;
-
- /* Feed slaves that are waiting for the initial SYNC (so these commands
- * are queued in the output buffer until the initial SYNC completes),
- * or are already in sync with the master. */
-
- /* Add the multi bulk length. */
- addReplyArrayLen(slave,argc);
+ for (j = 0; j < argc; j++) {
+ long objlen = stringObjectLen(argv[j]);
- /* Finally any additional argument that was not stored inside the
- * static buffer if any (from j to argc). */
- for (j = 0; j < argc; j++)
- addReplyBulk(slave,argv[j]);
+ /* We need to feed the buffer with the object as a bulk reply
+ * not just as a plain string, so create the $..CRLF payload len
+ * and add the final CRLF */
+ aux[0] = '$';
+ len = ll2string(aux+1,sizeof(aux)-1,objlen);
+ aux[len+1] = '\r';
+ aux[len+2] = '\n';
+ feedReplicationBuffer(aux,len+3);
+ feedReplicationBufferWithObject(argv[j]);
+ feedReplicationBuffer(aux+len+1,2);
}
}
@@ -323,26 +473,24 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
* guess what kind of bug it could be. */
void showLatestBacklog(void) {
if (server.repl_backlog == NULL) return;
+ if (listLength(server.repl_buffer_blocks) == 0) return;
- long long dumplen = 256;
- if (server.repl_backlog_histlen < dumplen)
- dumplen = server.repl_backlog_histlen;
-
- /* Identify the first byte to dump. */
- long long idx =
- (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) %
- server.repl_backlog_size;
+ size_t dumplen = 256;
+ if (server.repl_backlog->histlen < (long long)dumplen)
+ dumplen = server.repl_backlog->histlen;
- /* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
+ listNode *node = listLast(server.repl_buffer_blocks);
while(dumplen) {
- long long thislen =
- ((server.repl_backlog_size - idx) < dumplen) ?
- (server.repl_backlog_size - idx) : dumplen;
-
- dump = sdscatrepr(dump,server.repl_backlog+idx,thislen);
+ if (node == NULL) break;
+ replBufBlock *o = listNodeValue(node);
+ size_t thislen = o->used >= dumplen ? dumplen : o->used;
+ sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen);
+ sds tmp = sdscatsds(head, dump);
+ sdsfree(dump);
+ dump = tmp;
dumplen -= thislen;
- idx = 0;
+ node = listPrevNode(node);
}
/* Finally log such bytes: this is vital debugging info to
@@ -354,10 +502,7 @@ void showLatestBacklog(void) {
/* This function is used in order to proxy what we receive from our master
* to our sub-slaves. */
#include <ctype.h>
-void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
- listNode *ln;
- listIter li;
-
+void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) {
/* Debugging: this is handy to see the stream sent from master
* to slaves. Disabled with if(0). */
if (0) {
@@ -368,14 +513,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
printf("\n");
}
- if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
- listRewind(slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
-
- if (!canFeedReplicaReplBuffer(slave)) continue;
- addReplyProto(slave,buf,buflen);
- }
+ /* There must be replication backlog if having attached slaves. */
+ if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL);
+ if (server.repl_backlog) feedReplicationBuffer(buf,buflen);
}
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
@@ -422,11 +562,11 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
/* Feed the slave 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */
long long addReplyReplicationBacklog(client *c, long long offset) {
- long long j, skip, len;
+ long long skip;
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
- if (server.repl_backlog_histlen == 0) {
+ if (server.repl_backlog->histlen == 0) {
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
return 0;
}
@@ -434,41 +574,58 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
server.repl_backlog_size);
serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
- server.repl_backlog_off);
+ server.repl_backlog->offset);
serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
- server.repl_backlog_histlen);
- serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
- server.repl_backlog_idx);
+ server.repl_backlog->histlen);
/* Compute the amount of bytes we need to discard. */
- skip = offset - server.repl_backlog_off;
+ skip = offset - server.repl_backlog->offset;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
- /* Point j to the oldest byte, that is actually our
- * server.repl_backlog_off byte. */
- j = (server.repl_backlog_idx +
- (server.repl_backlog_size-server.repl_backlog_histlen)) %
- server.repl_backlog_size;
- serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
+ /* Iterate recorded blocks, quickly search the approximate node. */
+ listNode *node = NULL;
+ if (raxSize(server.repl_backlog->blocks_index) > 0) {
+ uint64_t encoded_offset = htonu64(offset);
+ raxIterator ri;
+ raxStart(&ri, server.repl_backlog->blocks_index);
+ raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t));
+ if (raxEOF(&ri)) {
+ /* No found, so search from the last recorded node. */
+ raxSeek(&ri, "$", NULL, 0);
+ raxPrev(&ri);
+ node = (listNode *)ri.data;
+ } else {
+ raxPrev(&ri); /* Skip the sought node. */
+ /* We should search from the prev node since the offset of current
+ * sought node exceeds searching offset. */
+ if (raxPrev(&ri))
+ node = (listNode *)ri.data;
+ else
+ node = server.repl_backlog->ref_repl_buf_node;
+ }
+ raxStop(&ri);
+ } else {
+ /* No recorded blocks, just from the start node to search. */
+ node = server.repl_backlog->ref_repl_buf_node;
+ }
- /* Discard the amount of data to seek to the specified 'offset'. */
- j = (j + skip) % server.repl_backlog_size;
+ /* Search the exact node. */
+ while (node != NULL) {
+ replBufBlock *o = listNodeValue(node);
+ if (o->repl_offset + (long long)o->used >= offset) break;
+ node = listNextNode(node);
+ }
+ serverAssert(node != NULL);
- /* Feed slave with data. Since it is a circular buffer we have to
- * split the reply in two parts if we are cross-boundary. */
- len = server.repl_backlog_histlen - skip;
- serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
- while(len) {
- long long thislen =
- ((server.repl_backlog_size - j) < len) ?
- (server.repl_backlog_size - j) : len;
+ /* Install a writer handler first.*/
+ prepareClientToWrite(c);
+ /* Setting output buffer of the replica. */
+ replBufBlock *o = listNodeValue(node);
+ o->refcount++;
+ c->ref_repl_buf_node = node;
+ c->ref_block_pos = offset - o->repl_offset;
- serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
- addReplyProto(c,server.repl_backlog + j, thislen);
- len -= thislen;
- j = 0;
- }
- return server.repl_backlog_histlen - skip;
+ return server.repl_backlog->histlen - skip;
}
/* Return the offset to provide as reply to the PSYNC command received
@@ -569,8 +726,8 @@ int masterTryPartialResynchronization(client *c) {
/* We still have the data our slave is asking for? */
if (!server.repl_backlog ||
- psync_offset < server.repl_backlog_off ||
- psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
+ psync_offset < server.repl_backlog->offset ||
+ psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
@@ -853,7 +1010,8 @@ void syncCommand(client *c) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer.
* We don't copy buffer if clients don't want. */
- if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave);
+ if (!(c->flags & CLIENT_REPL_RDBONLY))
+ copyReplicaOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
@@ -3488,6 +3646,16 @@ void replicationCron(void) {
* with any persistence. */
removeRDBUsedToSyncReplicas();
+ /* Sanity check replication buffer, the first block of replication buffer blocks
+ * must be referenced by someone, since it will be freed when not referenced,
+ * otherwise, server will OOM. also, its refcount must not be more than
+ * replicas number + 1(replication backlog). */
+ if (listLength(server.repl_buffer_blocks) > 0) {
+ replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks));
+ serverAssert(o->refcount > 0 &&
+ o->refcount <= (int)listLength(server.slaves)+1);
+ }
+
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
diff --git a/src/server.c b/src/server.c
index ee1b689a6..dcce3d177 100644
--- a/src/server.c
+++ b/src/server.c
@@ -3450,6 +3450,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
+ /* Incrementally trim replication backlog, 10 times the normal speed is
+ * to free replication backlog as much as possible. */
+ if (server.repl_backlog)
+ incrementalTrimReplicationBacklog(10*REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
+
/* Disconnect some clients if they are consuming too much memory. */
evictClients();
@@ -3717,9 +3722,6 @@ void initServerConfig(void) {
/* Replication partial resync backlog */
server.repl_backlog = NULL;
- server.repl_backlog_histlen = 0;
- server.repl_backlog_idx = 0;
- server.repl_backlog_off = 0;
server.repl_no_slaves_since = time(NULL);
/* Failover related */
@@ -4171,6 +4173,7 @@ void initServer(void) {
server.blocked_last_cron = 0;
server.blocking_op_nesting = 0;
server.thp_enabled = 0;
+ resetReplicationBuffer();
if ((server.tls_port || server.tls_replication || server.tls_cluster)
&& tlsConfigure(&server.tls_ctx_config) == C_ERR) {
@@ -6330,6 +6333,7 @@ sds genRedisInfoString(const char *section) {
"mem_fragmentation_bytes:%zd\r\n"
"mem_not_counted_for_evict:%zu\r\n"
"mem_replication_backlog:%zu\r\n"
+ "mem_total_replication_buffers:%zu\r\n"
"mem_clients_slaves:%zu\r\n"
"mem_clients_normal:%zu\r\n"
"mem_aof_buffer:%zu\r\n"
@@ -6374,6 +6378,7 @@ sds genRedisInfoString(const char *section) {
mh->total_frag_bytes,
freeMemoryGetNotCountedMemory(),
mh->repl_backlog,
+ server.repl_buffer_mem,
mh->clients_slaves,
mh->clients_normal,
mh->aof_buffer,
@@ -6762,8 +6767,8 @@ sds genRedisInfoString(const char *section) {
server.second_replid_offset,
server.repl_backlog != NULL,
server.repl_backlog_size,
- server.repl_backlog_off,
- server.repl_backlog_histlen);
+ server.repl_backlog ? server.repl_backlog->offset : 0,
+ server.repl_backlog ? server.repl_backlog->histlen : 0);
}
/* CPU */
@@ -7515,15 +7520,19 @@ void dismissMemoryInChild(void) {
/* Currently we use zmadvise_dontneed only when we use jemalloc with Linux.
* so we avoid these pointless loops when they're not going to do anything. */
#if defined(USE_JEMALLOC) && defined(__linux__)
+ listIter li;
+ listNode *ln;
- /* Dismiss replication backlog. */
- if (server.repl_backlog != NULL) {
- dismissMemory(server.repl_backlog, server.repl_backlog_size);
+ /* Dismiss replication buffer. We don't need to separately dismiss replication
+ * backlog and replica' output buffer, because they just reference the global
+ * replication buffer but don't cost real memory. */
+ listRewind(server.repl_buffer_blocks, &li);
+ while((ln = listNext(&li))) {
+ replBufBlock *o = listNodeValue(ln);
+ dismissMemory(o, o->size);
}
/* Dismiss all clients memory. */
- listIter li;
- listNode *ln;
listRewind(server.clients, &li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
@@ -7592,15 +7601,34 @@ void loadDataFromDisk(void) {
server.second_replid_offset = rsi.repl_offset+1;
/* Rebase master_repl_offset from rsi.repl_offset. */
server.master_repl_offset += rsi.repl_offset;
- server.repl_backlog_off = server.master_repl_offset -
- server.repl_backlog_histlen + 1;
+ serverAssert(server.repl_backlog);
+ server.repl_backlog->offset = server.master_repl_offset -
+ server.repl_backlog->histlen + 1;
server.repl_no_slaves_since = time(NULL);
+
+ /* Rebase replication buffer blocks' offset since the previous
+ * setting offset starts from 0. */
+ listIter li;
+ listNode *ln;
+ listRewind(server.repl_buffer_blocks, &li);
+ while ((ln = listNext(&li))) {
+ replBufBlock *o = listNodeValue(ln);
+ o->repl_offset += rsi.repl_offset;
+ }
}
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
+
+ /* We always create replication backlog if server is a master, we need
+ * it because we put DELs in it when loading expired keys in RDB, but
+ * if RDB doesn't have replication info or there is no rdb, it is not
+ * possible to support partial resynchronization, to avoid extra memory
+ * of replication backlog, we drop it. */
+ if (server.master_repl_offset == 0 && server.repl_backlog)
+ freeReplicationBacklog();
}
}
diff --git a/src/server.h b/src/server.h
index b911697a1..21d5fcd65 100644
--- a/src/server.h
+++ b/src/server.h
@@ -377,6 +377,13 @@ typedef enum {
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
+/* The default number of replication backlog blocks to trim per call. */
+#define REPL_BACKLOG_TRIM_BLOCKS_PER_CALL 64
+
+/* In order to quickly find the requested offset for PSYNC requests,
+ * we index some nodes in the replication buffer linked list into a rax. */
+#define REPL_BACKLOG_INDEX_PER_BLOCKS 64
+
/* List related stuff */
#define LIST_HEAD 0
#define LIST_TAIL 1
@@ -767,6 +774,33 @@ typedef struct clientReplyBlock {
char buf[];
} clientReplyBlock;
+/* Replication buffer blocks is the list of replBufBlock.
+ *
+ * +--------------+ +--------------+ +--------------+
+ * | refcount = 1 | ... | refcount = 0 | ... | refcount = 2 |
+ * +--------------+ +--------------+ +--------------+
+ * | / \
+ * | / \
+ * | / \
+ * Repl Backlog Replia_A Replia_B
+ *
+ * Each replica or replication backlog increments only the refcount of the
+ * 'ref_repl_buf_node' which it points to. So when replica walks to the next
+ * node, it should first increase the next node's refcount, and when we trim
+ * the replication buffer nodes, we remove node always from the head node which
+ * refcount is 0. If the refcount of the head node is not 0, we must stop
+ * trimming and never iterate the next node. */
+
+/* Similar with 'clientReplyBlock', it is used for shared buffers between
+ * all replica clients and replication backlog. */
+typedef struct replBufBlock {
+ int refcount; /* Number of replicas or repl backlog using. */
+ long long id; /* The unique incremental number. */
+ long long repl_offset; /* Start replication offset of the block. */
+ size_t size, used;
+ char buf[];
+} replBufBlock;
+
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
@@ -929,6 +963,24 @@ typedef struct {
need more reserved IDs use UINT64_MAX-1,
-2, ... and so forth. */
+/* Replication backlog is not separate memory, it just is one consumer of
+ * the global replication buffer. This structure records the reference of
+ * replication buffers. Since the replication buffer block list may be very long,
+ * it would cost much time to search replication offset on partial resync, so
+ * we use one rax tree to index some blocks every REPL_BACKLOG_INDEX_PER_BLOCKS
+ * to make searching offset from replication buffer blocks list faster. */
+typedef struct replBacklog {
+ listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
+ * see the definition of replBufBlock. */
+ size_t unindexed_count; /* The count from last creating index block. */
+ rax *blocks_index; /* The index of reocrded blocks of replication
+ * buffer for quickly searching replication
+ * offset on partial resynchronization. */
+ long long histlen; /* Backlog actual data length */
+ long long offset; /* Replication "master offset" of first
+ * byte in the replication backlog buffer.*/
+} replBacklog;
+
typedef struct {
list *clients;
size_t mem_usage_sum;
@@ -1029,6 +1081,11 @@ typedef struct client {
listNode *mem_usage_bucket_node;
clientMemUsageBucket *mem_usage_bucket;
+ listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
+ * see the definition of replBufBlock. */
+ size_t ref_block_pos; /* Access position of referenced buffer block,
+ * i.e. the next offset to send. */
+
/* Response buffer */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
@@ -1528,14 +1585,8 @@ struct redisServer {
long long second_replid_offset; /* Accept offsets up to this for replid2. */
int slaveseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
- char *repl_backlog; /* Replication backlog for partial syncs */
+ replBacklog *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
- long long cfg_repl_backlog_size;/* Backlog circular buffer size in config */
- long long repl_backlog_histlen; /* Backlog actual data length */
- long long repl_backlog_idx; /* Backlog circular buffer current offset,
- that is the next byte will'll write to.*/
- long long repl_backlog_off; /* Replication "master offset" of first
- byte in the replication backlog buffer.*/
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
@@ -1547,6 +1598,9 @@ struct redisServer {
int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
+ size_t repl_buffer_mem; /* The memory of replication buffer. */
+ list *repl_buffer_blocks; /* Replication buffers blocks list
+ * (serving replica clients and repl backlog) */
/* Replication (slave) */
char *masteruser; /* AUTH with this user and masterauth with master */
sds masterauth; /* AUTH with this password with master */
@@ -2031,6 +2085,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(connection *conn);
+int prepareClientToWrite(client *c);
void addReplyNull(client *c);
void addReplyNullArray(client *c);
void addReplyBool(client *c, int b);
@@ -2063,8 +2118,8 @@ void addReplyPushLen(client *c, long length);
void addReplyHelp(client *c, const char **help);
void addReplySubcommandSyntaxError(client *c);
void addReplyLoadedModules(client *c);
+void copyReplicaOutputBuffer(client *dst, client *src);
void addListRangeReply(client *c, robj *o, long start, long end, int reverse);
-void copyClientOutputBuffer(client *dst, client *src);
size_t sdsZmallocSize(sds s);
size_t getStringObjectSdsUsedMemory(robj *o);
void freeClientReplyValue(void *o);
@@ -2238,7 +2293,10 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
-void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
+void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
+void resetReplicationBuffer(void);
+void feedReplicationBuffer(char *buf, size_t len);
+void freeReplicaReferencedReplBuffer(client *replica);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
@@ -2264,8 +2322,11 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset);
void changeReplicationId(void);
void clearReplicationId2(void);
void createReplicationBacklog(void);
+void freeReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
+void incrementalTrimReplicationBacklog(size_t blocks);
+int canFeedReplicaReplBuffer(client *replica);
void showLatestBacklog(void);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
@@ -2613,7 +2674,7 @@ size_t lazyfreeGetPendingObjectsCount(void);
size_t lazyfreeGetFreedObjectsCount(void);
void lazyfreeResetStats(void);
void freeObjAsync(robj *key, robj *obj, int dbid);
-
+void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
/* API to get key arguments from commands */
int *getKeysPrepareResult(getKeysResult *result, int numkeys);
diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl
index 301a4f63d..672143102 100644
--- a/tests/integration/psync2-master-restart.tcl
+++ b/tests/integration/psync2-master-restart.tcl
@@ -118,7 +118,8 @@ start_server {} {
$master config rewrite
$master debug set-active-expire 0
- for {set j 0} {$j < 1024} {incr j} {
+ # Make sure replication backlog is full and will be trimmed.
+ for {set j 0} {$j < 2048} {incr j} {
$master select [expr $j%16]
$master set $j somevalue px 10
}
@@ -149,7 +150,7 @@ start_server {} {
assert {[status $master repl_backlog_first_byte_offset] > [status $master second_repl_offset]}
assert {[status $master sync_partial_ok] == 0}
assert {[status $master sync_full] == 1}
- assert {[status $master rdb_last_load_keys_expired] == 1024}
+ assert {[status $master rdb_last_load_keys_expired] == 2048}
assert {[status $replica sync_full] == 1}
set digest [$master debug digest]
diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl
new file mode 100644
index 000000000..905766ae2
--- /dev/null
+++ b/tests/integration/replication-buffer.tcl
@@ -0,0 +1,218 @@
+# This test group aims to test that all replicas share one global replication buffer,
+# two replicas don't make replication buffer size double, and when there is no replica,
+# replica buffer will shrink.
+start_server {tags {"repl external:skip"}} {
+start_server {} {
+start_server {} {
+start_server {} {
+ set replica1 [srv -3 client]
+ set replica2 [srv -2 client]
+ set replica3 [srv -1 client]
+
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+
+ $master config set save ""
+ $master config set repl-backlog-size 16384
+ $master config set client-output-buffer-limit "replica 0 0 0"
+
+ # Make sure replica3 is synchronized with master
+ $replica3 replicaof $master_host $master_port
+ wait_for_sync $replica3
+
+ # Generating RDB will take some 100 seconds
+ $master config set rdb-key-save-delay 1000000
+ populate 100 "" 16
+
+ # Make sure replica1 and replica2 are waiting bgsave
+ $replica1 replicaof $master_host $master_port
+ $replica2 replicaof $master_host $master_port
+ wait_for_condition 50 100 {
+ ([s rdb_bgsave_in_progress] == 1) &&
+ [lindex [$replica1 role] 3] eq {sync} &&
+ [lindex [$replica2 role] 3] eq {sync}
+ } else {
+ fail "fail to sync with replicas"
+ }
+
+ test {All replicas share one global replication buffer} {
+ set before_used [s used_memory]
+ populate 1024 "" 1024 ; # Write extra 1M data
+ # New data uses 1M memory, but all replicas use only one
+ # replication buffer, so all replicas output memory is not
+ # more than double of replication buffer.
+ set repl_buf_mem [s mem_total_replication_buffers]
+ set extra_mem [expr {[s used_memory]-$before_used-1024*1024}]
+ assert {$extra_mem < 2*$repl_buf_mem}
+
+ # Kill replica1, replication_buffer will not become smaller
+ catch {$replica1 shutdown nosave}
+ wait_for_condition 50 100 {
+ [s connected_slaves] eq {2}
+ } else {
+ fail "replica doesn't disconnect with master"
+ }
+ assert_equal $repl_buf_mem [s mem_total_replication_buffers]
+ }
+
+ test {Replication buffer will become smaller when no replica uses} {
+ # Make sure replica3 catch up with the master
+ wait_for_ofs_sync $master $replica3
+
+ set repl_buf_mem [s mem_total_replication_buffers]
+ # Kill replica2, replication_buffer will become smaller
+ catch {$replica2 shutdown nosave}
+ wait_for_condition 50 100 {
+ [s connected_slaves] eq {1}
+ } else {
+ fail "replica2 doesn't disconnect with master"
+ }
+ assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]}
+ }
+}
+}
+}
+}
+
+# This test group aims to test replication backlog size can outgrow the backlog
+# limit config if there is a slow replica which keep massive replication buffers,
+# and replicas could use this replication buffer (beyond backlog config) for
+# partial re-synchronization. Of course, replication backlog memory also can
+# become smaller when master disconnects with slow replicas since output buffer
+# limit is reached.
+start_server {tags {"repl external:skip"}} {
+start_server {} {
+start_server {} {
+ set replica1 [srv -2 client]
+ set replica1_pid [s -2 process_id]
+ set replica2 [srv -1 client]
+ set replica2_pid [s -1 process_id]
+
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+
+ $master config set save ""
+ $master config set repl-backlog-size 16384
+ $master config set client-output-buffer-limit "replica 0 0 0"
+ $replica1 replicaof $master_host $master_port
+ wait_for_sync $replica1
+
+ test {Replication backlog size can outgrow the backlog limit config} {
+ # Generating RDB will take 1000 seconds
+ $master config set rdb-key-save-delay 1000000
+ populate 1000 master 10000
+ $replica2 replicaof $master_host $master_port
+ # Make sure replica2 is waiting bgsave
+ wait_for_condition 5000 100 {
+ ([s rdb_bgsave_in_progress] == 1) &&
+ [lindex [$replica2 role] 3] eq {sync}
+ } else {
+ fail "fail to sync with replicas"
+ }
+ # Replication actual backlog grow more than backlog setting since
+ # the slow replica2 kept replication buffer.
+ populate 10000 master 10000
+ assert {[s repl_backlog_histlen] > [expr 10000*10000]}
+ }
+
+ # Wait replica1 catch up with the master
+ wait_for_condition 1000 100 {
+ [s -2 master_repl_offset] eq [s master_repl_offset]
+ } else {
+ fail "Replica offset didn't catch up with the master after too long time"
+ }
+
+ test {Replica could use replication buffer (beyond backlog config) for partial resynchronization} {
+ # replica1 disconnects with master
+ $replica1 replicaof [srv -1 host] [srv -1 port]
+ # Write a mass of data that exceeds repl-backlog-size
+ populate 10000 master 10000
+ # replica1 reconnects with master
+ $replica1 replicaof $master_host $master_port
+ wait_for_condition 1000 100 {
+ [s -2 master_repl_offset] eq [s master_repl_offset]
+ } else {
+ fail "Replica offset didn't catch up with the master after too long time"
+ }
+
+ # replica2 still waits for bgsave ending
+ assert {[s rdb_bgsave_in_progress] eq {1} && [lindex [$replica2 role] 3] eq {sync}}
+ # master accepted replica1 partial resync
+ assert_equal [s sync_partial_ok] {1}
+ assert_equal [$master debug digest] [$replica1 debug digest]
+ }
+
+ test {Replication backlog memory will become smaller if disconnecting with replica} {
+ assert {[s repl_backlog_histlen] > [expr 2*10000*10000]}
+ assert_equal [s connected_slaves] {2}
+
+ exec kill -SIGSTOP $replica2_pid
+ r config set client-output-buffer-limit "replica 128k 0 0"
+ # trigger output buffer limit check
+ r set key [string repeat A [expr 64*1024]]
+ # master will close replica2's connection since replica2's output
+ # buffer limit is reached, so there only is replica1.
+ wait_for_condition 100 100 {
+ [s connected_slaves] eq {1}
+ } else {
+ fail "master didn't disconnect with replica2"
+ }
+
+ # Since we trim replication backlog inrementally, replication backlog
+ # memory may take time to be reclaimed.
+ wait_for_condition 1000 100 {
+ [s repl_backlog_histlen] < [expr 10000*10000]
+ } else {
+ fail "Replication backlog memory is not smaller"
+ }
+ exec kill -SIGCONT $replica2_pid
+ }
+}
+}
+}
+
+test {Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size} {
+ start_server {tags {"repl external:skip"}} {
+ start_server {} {
+ r config set save ""
+ r config set repl-backlog-size 100mb
+ r config set client-output-buffer-limit "replica 512k 0 0"
+
+ set replica [srv -1 client]
+ $replica replicaof [srv 0 host] [srv 0 port]
+ wait_for_sync $replica
+
+ set big_str [string repeat A [expr 10*1024*1024]] ;# 10mb big string
+ r multi
+ r client kill type replica
+ r set key $big_str
+ r set key $big_str
+ r debug sleep 2 ;# wait for replica reconnecting
+ r exec
+ # When replica reconnects with master, master accepts partial resync,
+ # and don't close replica client even client output buffer limit is
+ # reached.
+ r set key $big_str ;# trigger output buffer limit check
+ wait_for_ofs_sync r $replica
+ # master accepted replica partial resync
+ assert_equal [s sync_full] {1}
+ assert_equal [s sync_partial_ok] {1}
+
+ r multi
+ r set key $big_str
+ r set key $big_str
+ r exec
+ # replica's reply buffer size is more than client-output-buffer-limit but
+ # doesn't exceed repl-backlog-size, we don't close replica client.
+ wait_for_condition 1000 100 {
+ [s -1 master_repl_offset] eq [s master_repl_offset]
+ } else {
+ fail "Replica offset didn't catch up with the master after too long time"
+ }
+ assert_equal [s sync_full] {1}
+ assert_equal [s sync_partial_ok] {1}
+ }
+ }
+}
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index 3169245b1..e0529dcc3 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -527,8 +527,11 @@ test {diskless loading short read} {
$master multi
$master client kill type replica
$master set asdf asdf
- # the side effect of resizing the backlog is that it is flushed (16k is the min size)
- $master config set repl-backlog-size [expr {16384 + $i}]
+ # fill replication backlog with new content
+ $master config set repl-backlog-size 16384
+ for {set keyid 0} {$keyid < 10} {incr keyid} {
+ $master set "$keyid string_$keyid" [string repeat A 16384]
+ }
$master exec
}
# wait for loading to stop (fail)
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 48f8d7d47..dd2f1c970 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -43,6 +43,7 @@ set ::all_tests {
integration/replication-3
integration/replication-4
integration/replication-psync
+ integration/replication-buffer
integration/aof
integration/rdb
integration/corrupt-dump
diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl
index a98bfdb37..2e84a908d 100644
--- a/tests/unit/maxmemory.tcl
+++ b/tests/unit/maxmemory.tcl
@@ -355,7 +355,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
$rd_master setrange key:0 0 [string repeat A $payload_len]
}
for {set k 0} {$k < $cmd_count} {incr k} {
- #$rd_master read
+ $rd_master read
}
} else {
for {set k 0} {$k < $cmd_count} {incr k} {
@@ -382,12 +382,14 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
assert {$delta < $delta_max && $delta > -$delta_max}
$master client kill type slave
- set killed_used [s -1 used_memory]
+ set info_str [$master info memory]
+ set killed_used [getInfoProperty $info_str used_memory]
+ set killed_mem_not_counted_for_evict [getInfoProperty $info_str mem_not_counted_for_evict]
set killed_slave_buf [s -1 mem_clients_slaves]
- set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
# we need to exclude replies buffer and query buffer of slave from used memory after kill slave
set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict - [slave_query_buffer $master]}]
set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}]
+ assert {[$master dbsize] == 100}
assert {$killed_slave_buf == 0}
assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max}
diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl
index 5ba102284..8bc0f7cd4 100644
--- a/tests/unit/moduleapi/testrdb.tcl
+++ b/tests/unit/moduleapi/testrdb.tcl
@@ -107,8 +107,11 @@ tags "modules" {
$master multi
$master client kill type replica
$master set asdf asdf
- # the side effect of resizing the backlog is that it is flushed (16k is the min size)
- $master config set repl-backlog-size [expr {16384 + $i}]
+ # fill replication backlog with new content
+ $master config set repl-backlog-size 16384
+ for {set keyid 0} {$keyid < 10} {incr keyid} {
+ $master set "$keyid string_$keyid" [string repeat A 16384]
+ }
$master exec
}
# wait for loading to stop (fail)