summaryrefslogtreecommitdiff
path: root/src/replication.c
diff options
context:
space:
mode:
authorxbasel <103044017+xbasel@users.noreply.github.com>2023-03-12 19:47:06 +0200
committerGitHub <noreply@github.com>2023-03-12 19:47:06 +0200
commit7be7834e6573134b1e5a0b8ea90acf856b0c7774 (patch)
tree79b2b2b961c32f1ebc3b0e63a11d0a5daf0f6ff4 /src/replication.c
parent08cd3bf2924da099facdc1fa30706c93ad3b4871 (diff)
downloadredis-7be7834e6573134b1e5a0b8ea90acf856b0c7774.tar.gz
Large blocks of replica client output buffer could lead to psync loops and unnecessary memory usage (#11666)
This can happen when a key almost equal or larger than the client output buffer limit of the replica is written. Example: 1. DB is empty 2. Backlog size is 1 MB 3. Client out put buffer limit is 2 MB 4. Client writes a 3 MB key 5. The shared replication buffer will have a single node which contains the key written above, and it exceeds the backlog size. At this point the client output buffer usage calculation will report the replica buffer to be 3 MB (or more) even after sending all the data to the replica. The primary drops the replica connection for exceeding the limits, the replica reconnects and successfully executes partial sync but the primary will drop the connection again because the buffer usage is still 3 MB. This happens over and over. To mitigate the problem, this fix limits the maximum size of a single backlog node to be (repl_backlog_size/16). This way a single node can't exceed the limits of the COB (the COB has to be larger than the backlog). It also means that if the backlog has some excessive data it can't trim, it would be at most about 6% overuse. other notes: 1. a loop was added in feedReplicationBuffer which caused a massive LOC change due to indentation, the actual changes are just the `min(max` and the loop. 3. an unrelated change in an existing test to speed up a server termination which took 10 seconds. Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/replication.c')
-rw-r--r--src/replication.c152
1 files changed, 80 insertions, 72 deletions
diff --git a/src/replication.c b/src/replication.c
index 33bb8242c..854688f2c 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -335,86 +335,94 @@ void feedReplicationBuffer(char *s, size_t len) {
server.master_repl_offset += len;
server.repl_backlog->histlen += len;
- size_t start_pos = 0; /* The position of referenced block to start sending. */
- listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
- int add_new_block = 0; /* Create new block if current block is total used. */
- listNode *ln = listLast(server.repl_buffer_blocks);
- replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
-
- /* Append to tail string when possible. */
- if (tail && tail->size > tail->used) {
- start_node = listLast(server.repl_buffer_blocks);
- start_pos = tail->used;
- /* Copy the part we can fit into the tail, and leave the rest for a
- * new node */
- size_t avail = tail->size - tail->used;
- size_t copy = (avail >= len) ? len : avail;
- memcpy(tail->buf + tail->used, s, copy);
- tail->used += copy;
- s += copy;
- len -= copy;
- }
- if (len) {
- /* Create a new node, make sure it is allocated to at
- * least PROTO_REPLY_CHUNK_BYTES */
- size_t usable_size;
- size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len;
- tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
- /* Take over the allocation's internal fragmentation */
- tail->size = usable_size - sizeof(replBufBlock);
- tail->used = len;
- tail->refcount = 0;
- tail->repl_offset = server.master_repl_offset - tail->used + 1;
- tail->id = repl_block_id++;
- memcpy(tail->buf, s, len);
- listAddNodeTail(server.repl_buffer_blocks, tail);
- /* We also count the list node memory into replication buffer memory. */
- server.repl_buffer_mem += (usable_size + sizeof(listNode));
- add_new_block = 1;
- if (start_node == NULL) {
+ while(len > 0) {
+ size_t start_pos = 0; /* The position of referenced block to start sending. */
+ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
+ int add_new_block = 0; /* Create new block if current block is total used. */
+ listNode *ln = listLast(server.repl_buffer_blocks);
+ replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
+
+ /* Append to tail string when possible. */
+ if (tail && tail->size > tail->used) {
start_node = listLast(server.repl_buffer_blocks);
- start_pos = 0;
+ start_pos = tail->used;
+ /* Copy the part we can fit into the tail, and leave the rest for a
+ * new node */
+ size_t avail = tail->size - tail->used;
+ size_t copy = (avail >= len) ? len : avail;
+ memcpy(tail->buf + tail->used, s, copy);
+ tail->used += copy;
+ s += copy;
+ len -= copy;
+ }
+ if (len) {
+ /* Create a new node, make sure it is allocated to at
+ * least PROTO_REPLY_CHUNK_BYTES */
+ size_t usable_size;
+ /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them,
+ * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't
+ * trim when we only still need to hold a small portion from them. */
+ size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), (size_t)server.repl_backlog_size / 16);
+ tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
+ /* Take over the allocation's internal fragmentation */
+ tail->size = usable_size - sizeof(replBufBlock);
+ size_t copy = (tail->size >= len) ? len : tail->size;
+ tail->used = copy;
+ tail->refcount = 0;
+ tail->repl_offset = server.master_repl_offset - tail->used + 1;
+ tail->id = repl_block_id++;
+ memcpy(tail->buf, s, copy);
+ listAddNodeTail(server.repl_buffer_blocks, tail);
+ /* We also count the list node memory into replication buffer memory. */
+ server.repl_buffer_mem += (usable_size + sizeof(listNode));
+ add_new_block = 1;
+ if (start_node == NULL) {
+ start_node = listLast(server.repl_buffer_blocks);
+ start_pos = 0;
+ }
+ s += copy;
+ len -= copy;
}
- }
- /* For output buffer of replicas. */
- listIter li;
- listRewind(server.slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
- if (!canFeedReplicaReplBuffer(slave)) continue;
+ /* For output buffer of replicas. */
+ listIter li;
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
+
+ /* Update shared replication buffer start position. */
+ if (slave->ref_repl_buf_node == NULL) {
+ slave->ref_repl_buf_node = start_node;
+ slave->ref_block_pos = start_pos;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ }
- /* Update shared replication buffer start position. */
- if (slave->ref_repl_buf_node == NULL) {
- slave->ref_repl_buf_node = start_node;
- slave->ref_block_pos = start_pos;
- /* Only increase the start block reference count. */
- ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ /* Check output buffer limit only when add new block. */
+ if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
}
- /* Check output buffer limit only when add new block. */
- if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
- }
-
- /* For replication backlog */
- if (server.repl_backlog->ref_repl_buf_node == NULL) {
- server.repl_backlog->ref_repl_buf_node = start_node;
- /* Only increase the start block reference count. */
- ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ /* For replication backlog */
+ if (server.repl_backlog->ref_repl_buf_node == NULL) {
+ server.repl_backlog->ref_repl_buf_node = start_node;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
- /* Replication buffer must be empty before adding replication stream
- * into replication backlog. */
- serverAssert(add_new_block == 1 && start_pos == 0);
- }
- if (add_new_block) {
- createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
+ /* Replication buffer must be empty before adding replication stream
+ * into replication backlog. */
+ serverAssert(add_new_block == 1 && start_pos == 0);
+ }
+ if (add_new_block) {
+ createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
+ }
+ /* Try to trim replication backlog since replication backlog may exceed
+ * our setting when we add replication stream. Note that it is important to
+ * try to trim at least one node since in the common case this is where one
+ * new backlog node is added and one should be removed. See also comments
+ * in freeMemoryGetNotCountedMemory for details. */
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
- /* Try to trim replication backlog since replication backlog may exceed
- * our setting when we add replication stream. Note that it is important to
- * try to trim at least one node since in the common case this is where one
- * new backlog node is added and one should be removed. See also comments
- * in freeMemoryGetNotCountedMemory for details. */
- incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
/* Propagate write commands to replication stream.