diff options
2 files changed, 137 insertions, 72 deletions
diff --git a/src/replication.c b/src/replication.c
index 33bb8242c..854688f2c 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -335,86 +335,94 @@ void feedReplicationBuffer(char *s, size_t len) {
server.master_repl_offset += len;
server.repl_backlog->histlen += len;
- size_t start_pos = 0; /* The position of referenced block to start sending. */
- listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
- int add_new_block = 0; /* Create new block if current block is total used. */
- listNode *ln = listLast(server.repl_buffer_blocks);
- replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
- /* Append to tail string when possible. */
- if (tail && tail->size > tail->used) {
- start_node = listLast(server.repl_buffer_blocks);
- start_pos = tail->used;
- /* Copy the part we can fit into the tail, and leave the rest for a
- * new node */
- size_t avail = tail->size - tail->used;
- size_t copy = (avail >= len) ? len : avail;
- memcpy(tail->buf + tail->used, s, copy);
- tail->used += copy;
- s += copy;
- len -= copy;
- }
- if (len) {
- /* Create a new node, make sure it is allocated to at
- size_t usable_size;
- size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len;
- tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
- /* Take over the allocation's internal fragmentation */
- tail->size = usable_size - sizeof(replBufBlock);
- tail->used = len;
- tail->refcount = 0;
- tail->repl_offset = server.master_repl_offset - tail->used + 1;
- tail->id = repl_block_id++;
- memcpy(tail->buf, s, len);
- listAddNodeTail(server.repl_buffer_blocks, tail);
- /* We also count the list node memory into replication buffer memory. */
- server.repl_buffer_mem += (usable_size + sizeof(listNode));
- add_new_block = 1;
- if (start_node == NULL) {
+ while(len > 0) {
+ size_t start_pos = 0; /* The position of referenced block to start sending. */
+ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
+ int add_new_block = 0; /* Create new block if current block is total used. */
+ listNode *ln = listLast(server.repl_buffer_blocks);
+ replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
+ /* Append to tail string when possible. */
+ if (tail && tail->size > tail->used) {
start_node = listLast(server.repl_buffer_blocks);
- start_pos = 0;
+ start_pos = tail->used;
+ /* Copy the part we can fit into the tail, and leave the rest for a
+ * new node */
+ size_t avail = tail->size - tail->used;
+ size_t copy = (avail >= len) ? len : avail;
+ memcpy(tail->buf + tail->used, s, copy);
+ tail->used += copy;
+ s += copy;
+ len -= copy;
+ }
+ if (len) {
+ /* Create a new node, make sure it is allocated to at
+ size_t usable_size;
+ /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them,
+ * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't
+ * trim when we only still need to hold a small portion from them. */
+ size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), (size_t)server.repl_backlog_size / 16);
+ tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
+ /* Take over the allocation's internal fragmentation */
+ tail->size = usable_size - sizeof(replBufBlock);
+ size_t copy = (tail->size >= len) ? len : tail->size;
+ tail->used = copy;
+ tail->refcount = 0;
+ tail->repl_offset = server.master_repl_offset - tail->used + 1;
+ tail->id = repl_block_id++;
+ memcpy(tail->buf, s, copy);
+ listAddNodeTail(server.repl_buffer_blocks, tail);
+ /* We also count the list node memory into replication buffer memory. */
+ server.repl_buffer_mem += (usable_size + sizeof(listNode));
+ add_new_block = 1;
+ if (start_node == NULL) {
+ start_node = listLast(server.repl_buffer_blocks);
+ start_pos = 0;
+ }
+ s += copy;
+ len -= copy;
- }
- /* For output buffer of replicas. */
- listIter li;
- listRewind(server.slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
- if (!canFeedReplicaReplBuffer(slave)) continue;
+ /* For output buffer of replicas. */
+ listIter li;
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
+ /* Update shared replication buffer start position. */
+ if (slave->ref_repl_buf_node == NULL) {
+ slave->ref_repl_buf_node = start_node;
+ slave->ref_block_pos = start_pos;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ }
- /* Update shared replication buffer start position. */
- if (slave->ref_repl_buf_node == NULL) {
- slave->ref_repl_buf_node = start_node;
- slave->ref_block_pos = start_pos;
- /* Only increase the start block reference count. */
- ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ /* Check output buffer limit only when add new block. */
+ if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
- /* Check output buffer limit only when add new block. */
- if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
- }
- /* For replication backlog */
- if (server.repl_backlog->ref_repl_buf_node == NULL) {
- server.repl_backlog->ref_repl_buf_node = start_node;
- /* Only increase the start block reference count. */
- ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ /* For replication backlog */
+ if (server.repl_backlog->ref_repl_buf_node == NULL) {
+ server.repl_backlog->ref_repl_buf_node = start_node;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
- /* Replication buffer must be empty before adding replication stream
- * into replication backlog. */
- serverAssert(add_new_block == 1 && start_pos == 0);
- }
- if (add_new_block) {
- createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
+ /* Replication buffer must be empty before adding replication stream
+ * into replication backlog. */
+ serverAssert(add_new_block == 1 && start_pos == 0);
+ }
+ if (add_new_block) {
+ createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
+ }
+ /* Try to trim replication backlog since replication backlog may exceed
+ * our setting when we add replication stream. Note that it is important to
+ * try to trim at least one node since in the common case this is where one
+ * new backlog node is added and one should be removed. See also comments
+ * in freeMemoryGetNotCountedMemory for details. */
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
- /* Try to trim replication backlog since replication backlog may exceed
- * our setting when we add replication stream. Note that it is important to
- * try to trim at least one node since in the common case this is where one
- * new backlog node is added and one should be removed. See also comments
- * in freeMemoryGetNotCountedMemory for details. */
- incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
/* Propagate write commands to replication stream.
diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl
index 07e2cbc1b..fe85632ae 100644
--- a/tests/integration/replication-buffer.tcl
+++ b/tests/integration/replication-buffer.tcl
@@ -180,6 +180,8 @@ start_server {} {
exec kill -SIGCONT $replica2_pid
+ # speed up termination
+ $master config set shutdown-timeout 0
@@ -227,3 +229,58 @@ test {Partial resynchronization is successful even client-output-buffer-limit is
+# This test was added to make sure big keys added to the backlog do not trigger psync loop.
+test {Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending} {
+ proc client_field {r type f} {
+ set client [$r client list type $type]
+ if {![regexp $f=(\[a-zA-Z0-9-\]+) $client - res]} {
+ error "field $f not found for in $client"
+ }
+ return $res
+ }
+ start_server {tags {"repl external:skip"}} {
+ start_server {} {
+ set replica [srv -1 client]
+ set replica_host [srv -1 host]
+ set replica_port [srv -1 port]
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ $master config set repl-backlog-size 16384
+ $master config set client-output-buffer-limit "replica 32768 32768 60"
+ # Key has has to be larger than replica client-output-buffer limit.
+ set keysize [expr 256*1024]
+ $replica replicaof $master_host $master_port
+ wait_for_condition 50 100 {
+ [lindex [$replica role] 0] eq {slave} &&
+ [string match {*master_link_status:up*} [$replica info replication]]
+ } else {
+ fail "Can't turn the instance into a replica"
+ }
+ set _v [prepare_value $keysize]
+ $master set key $_v
+ wait_for_ofs_sync $master $replica
+ # Write another key to force the test to wait for another event loop iteration
+ # to give the serverCron a chance to disconnect replicas with COB size exeeeding the limits
+ $master set key1 "1"
+ wait_for_ofs_sync $master $replica
+ assert {[status $master connected_slaves] == 1}
+ wait_for_condition 50 100 {
+ [client_field $master replica tot-mem] < $keysize
+ } else {
+ fail "replica client-output-buffer usage is higher than expected."
+ }
+ assert {[status $master sync_partial_ok] == 0}
+ }
+ }