summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxbasel <103044017+xbasel@users.noreply.github.com>2023-03-12 19:47:06 +0200
committerGitHub <noreply@github.com>2023-03-12 19:47:06 +0200
commit7be7834e6573134b1e5a0b8ea90acf856b0c7774 (patch)
tree79b2b2b961c32f1ebc3b0e63a11d0a5daf0f6ff4
parent08cd3bf2924da099facdc1fa30706c93ad3b4871 (diff)
downloadredis-7be7834e6573134b1e5a0b8ea90acf856b0c7774.tar.gz
Large blocks of replica client output buffer could lead to psync loops and unnecessary memory usage (#11666)
This can happen when a key almost equal or larger than the client output buffer limit of the replica is written. Example: 1. DB is empty 2. Backlog size is 1 MB 3. Client out put buffer limit is 2 MB 4. Client writes a 3 MB key 5. The shared replication buffer will have a single node which contains the key written above, and it exceeds the backlog size. At this point the client output buffer usage calculation will report the replica buffer to be 3 MB (or more) even after sending all the data to the replica. The primary drops the replica connection for exceeding the limits, the replica reconnects and successfully executes partial sync but the primary will drop the connection again because the buffer usage is still 3 MB. This happens over and over. To mitigate the problem, this fix limits the maximum size of a single backlog node to be (repl_backlog_size/16). This way a single node can't exceed the limits of the COB (the COB has to be larger than the backlog). It also means that if the backlog has some excessive data it can't trim, it would be at most about 6% overuse. other notes: 1. a loop was added in feedReplicationBuffer which caused a massive LOC change due to indentation, the actual changes are just the `min(max` and the loop. 3. an unrelated change in an existing test to speed up a server termination which took 10 seconds. Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r--src/replication.c152
-rw-r--r--tests/integration/replication-buffer.tcl57
2 files changed, 137 insertions, 72 deletions
diff --git a/src/replication.c b/src/replication.c
index 33bb8242c..854688f2c 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -335,86 +335,94 @@ void feedReplicationBuffer(char *s, size_t len) {
server.master_repl_offset += len;
server.repl_backlog->histlen += len;
- size_t start_pos = 0; /* The position of referenced block to start sending. */
- listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
- int add_new_block = 0; /* Create new block if current block is total used. */
- listNode *ln = listLast(server.repl_buffer_blocks);
- replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
-
- /* Append to tail string when possible. */
- if (tail && tail->size > tail->used) {
- start_node = listLast(server.repl_buffer_blocks);
- start_pos = tail->used;
- /* Copy the part we can fit into the tail, and leave the rest for a
- * new node */
- size_t avail = tail->size - tail->used;
- size_t copy = (avail >= len) ? len : avail;
- memcpy(tail->buf + tail->used, s, copy);
- tail->used += copy;
- s += copy;
- len -= copy;
- }
- if (len) {
- /* Create a new node, make sure it is allocated to at
- * least PROTO_REPLY_CHUNK_BYTES */
- size_t usable_size;
- size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len;
- tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
- /* Take over the allocation's internal fragmentation */
- tail->size = usable_size - sizeof(replBufBlock);
- tail->used = len;
- tail->refcount = 0;
- tail->repl_offset = server.master_repl_offset - tail->used + 1;
- tail->id = repl_block_id++;
- memcpy(tail->buf, s, len);
- listAddNodeTail(server.repl_buffer_blocks, tail);
- /* We also count the list node memory into replication buffer memory. */
- server.repl_buffer_mem += (usable_size + sizeof(listNode));
- add_new_block = 1;
- if (start_node == NULL) {
+ while(len > 0) {
+ size_t start_pos = 0; /* The position of referenced block to start sending. */
+ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
+ int add_new_block = 0; /* Create new block if current block is total used. */
+ listNode *ln = listLast(server.repl_buffer_blocks);
+ replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
+
+ /* Append to tail string when possible. */
+ if (tail && tail->size > tail->used) {
start_node = listLast(server.repl_buffer_blocks);
- start_pos = 0;
+ start_pos = tail->used;
+ /* Copy the part we can fit into the tail, and leave the rest for a
+ * new node */
+ size_t avail = tail->size - tail->used;
+ size_t copy = (avail >= len) ? len : avail;
+ memcpy(tail->buf + tail->used, s, copy);
+ tail->used += copy;
+ s += copy;
+ len -= copy;
+ }
+ if (len) {
+ /* Create a new node, make sure it is allocated to at
+ * least PROTO_REPLY_CHUNK_BYTES */
+ size_t usable_size;
+ /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them,
+ * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't
+ * trim when we only still need to hold a small portion from them. */
+ size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), (size_t)server.repl_backlog_size / 16);
+ tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
+ /* Take over the allocation's internal fragmentation */
+ tail->size = usable_size - sizeof(replBufBlock);
+ size_t copy = (tail->size >= len) ? len : tail->size;
+ tail->used = copy;
+ tail->refcount = 0;
+ tail->repl_offset = server.master_repl_offset - tail->used + 1;
+ tail->id = repl_block_id++;
+ memcpy(tail->buf, s, copy);
+ listAddNodeTail(server.repl_buffer_blocks, tail);
+ /* We also count the list node memory into replication buffer memory. */
+ server.repl_buffer_mem += (usable_size + sizeof(listNode));
+ add_new_block = 1;
+ if (start_node == NULL) {
+ start_node = listLast(server.repl_buffer_blocks);
+ start_pos = 0;
+ }
+ s += copy;
+ len -= copy;
}
- }
- /* For output buffer of replicas. */
- listIter li;
- listRewind(server.slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
- if (!canFeedReplicaReplBuffer(slave)) continue;
+ /* For output buffer of replicas. */
+ listIter li;
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
+
+ /* Update shared replication buffer start position. */
+ if (slave->ref_repl_buf_node == NULL) {
+ slave->ref_repl_buf_node = start_node;
+ slave->ref_block_pos = start_pos;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ }
- /* Update shared replication buffer start position. */
- if (slave->ref_repl_buf_node == NULL) {
- slave->ref_repl_buf_node = start_node;
- slave->ref_block_pos = start_pos;
- /* Only increase the start block reference count. */
- ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ /* Check output buffer limit only when add new block. */
+ if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
}
- /* Check output buffer limit only when add new block. */
- if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
- }
-
- /* For replication backlog */
- if (server.repl_backlog->ref_repl_buf_node == NULL) {
- server.repl_backlog->ref_repl_buf_node = start_node;
- /* Only increase the start block reference count. */
- ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ /* For replication backlog */
+ if (server.repl_backlog->ref_repl_buf_node == NULL) {
+ server.repl_backlog->ref_repl_buf_node = start_node;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
- /* Replication buffer must be empty before adding replication stream
- * into replication backlog. */
- serverAssert(add_new_block == 1 && start_pos == 0);
- }
- if (add_new_block) {
- createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
+ /* Replication buffer must be empty before adding replication stream
+ * into replication backlog. */
+ serverAssert(add_new_block == 1 && start_pos == 0);
+ }
+ if (add_new_block) {
+ createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
+ }
+ /* Try to trim replication backlog since replication backlog may exceed
+ * our setting when we add replication stream. Note that it is important to
+ * try to trim at least one node since in the common case this is where one
+ * new backlog node is added and one should be removed. See also comments
+ * in freeMemoryGetNotCountedMemory for details. */
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
- /* Try to trim replication backlog since replication backlog may exceed
- * our setting when we add replication stream. Note that it is important to
- * try to trim at least one node since in the common case this is where one
- * new backlog node is added and one should be removed. See also comments
- * in freeMemoryGetNotCountedMemory for details. */
- incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
/* Propagate write commands to replication stream.
diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl
index 07e2cbc1b..fe85632ae 100644
--- a/tests/integration/replication-buffer.tcl
+++ b/tests/integration/replication-buffer.tcl
@@ -180,6 +180,8 @@ start_server {} {
}
exec kill -SIGCONT $replica2_pid
}
+ # speed up termination
+ $master config set shutdown-timeout 0
}
}
}
@@ -227,3 +229,58 @@ test {Partial resynchronization is successful even client-output-buffer-limit is
}
}
}
+
+# This test was added to make sure big keys added to the backlog do not trigger psync loop.
+test {Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending} {
+ proc client_field {r type f} {
+ set client [$r client list type $type]
+ if {![regexp $f=(\[a-zA-Z0-9-\]+) $client - res]} {
+ error "field $f not found for in $client"
+ }
+ return $res
+ }
+
+ start_server {tags {"repl external:skip"}} {
+ start_server {} {
+ set replica [srv -1 client]
+ set replica_host [srv -1 host]
+ set replica_port [srv -1 port]
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+
+ $master config set repl-backlog-size 16384
+ $master config set client-output-buffer-limit "replica 32768 32768 60"
+ # Key has has to be larger than replica client-output-buffer limit.
+ set keysize [expr 256*1024]
+
+ $replica replicaof $master_host $master_port
+ wait_for_condition 50 100 {
+ [lindex [$replica role] 0] eq {slave} &&
+ [string match {*master_link_status:up*} [$replica info replication]]
+ } else {
+ fail "Can't turn the instance into a replica"
+ }
+
+ set _v [prepare_value $keysize]
+ $master set key $_v
+ wait_for_ofs_sync $master $replica
+
+ # Write another key to force the test to wait for another event loop iteration
+ # to give the serverCron a chance to disconnect replicas with COB size exeeeding the limits
+ $master set key1 "1"
+ wait_for_ofs_sync $master $replica
+
+ assert {[status $master connected_slaves] == 1}
+
+ wait_for_condition 50 100 {
+ [client_field $master replica tot-mem] < $keysize
+ } else {
+ fail "replica client-output-buffer usage is higher than expected."
+ }
+
+ assert {[status $master sync_partial_ok] == 0}
+ }
+ }
+}
+