summaryrefslogtreecommitdiff
path: root/src/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/replication.c')
-rw-r--r--src/replication.c520
1 files changed, 344 insertions, 176 deletions
diff --git a/src/replication.c b/src/replication.c
index 1c8836c02..0629a4ca9 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -33,6 +33,7 @@
#include "cluster.h"
#include "bio.h"
+#include <memory.h>
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
@@ -109,83 +110,94 @@ int bg_unlink(const char *filename) {
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
- server.repl_backlog = zmalloc(server.cfg_repl_backlog_size);
- server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog);
- server.repl_backlog_histlen = 0;
- server.repl_backlog_idx = 0;
-
+ server.repl_backlog = zmalloc(sizeof(replBacklog));
+ server.repl_backlog->ref_repl_buf_node = NULL;
+ server.repl_backlog->unindexed_count = 0;
+ server.repl_backlog->blocks_index = raxNew();
+ server.repl_backlog->histlen = 0;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
- server.repl_backlog_off = server.master_repl_offset+1;
+ server.repl_backlog->offset = server.master_repl_offset+1;
}
/* This function is called when the user modifies the replication backlog
* size at runtime. It is up to the function to both update the
- * server.cfg_repl_backlog_size and to resize the buffer and setup it so that
+ * server.repl_backlog_size and to resize the buffer and setup it so that
* it contains the same data as the previous one (possibly less data, but
* the most recent bytes, or the same data and more free space in case the
* buffer is enlarged). */
void resizeReplicationBacklog(long long newsize) {
if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
- if (server.cfg_repl_backlog_size == newsize) return;
+ if (server.repl_backlog_size == newsize) return;
- server.cfg_repl_backlog_size = newsize;
- if (server.repl_backlog != NULL) {
- /* What we actually do is to flush the old buffer and realloc a new
- * empty one. It will refill with new data incrementally.
- * The reason is that copying a few gigabytes adds latency and even
- * worse often we need to alloc additional space before freeing the
- * old buffer. */
- zfree(server.repl_backlog);
- server.repl_backlog = zmalloc(server.cfg_repl_backlog_size);
- server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog);
- server.repl_backlog_histlen = 0;
- server.repl_backlog_idx = 0;
- /* Next byte we have is... the next since the buffer is empty. */
- server.repl_backlog_off = server.master_repl_offset+1;
- }
+ server.repl_backlog_size = newsize;
+ if (server.repl_backlog)
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
void freeReplicationBacklog(void) {
serverAssert(listLength(server.slaves) == 0);
+ if (server.repl_backlog == NULL) return;
+
+ /* Decrease the start buffer node reference count. */
+ if (server.repl_backlog->ref_repl_buf_node) {
+ replBufBlock *o = listNodeValue(
+ server.repl_backlog->ref_repl_buf_node);
+ serverAssert(o->refcount == 1); /* Last reference. */
+ o->refcount--;
+ }
+
+ /* Replication buffer blocks are completely released when we free the
+ * backlog, since the backlog is released only when there are no replicas
+ * and the backlog keeps the last reference of all blocks. */
+ freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks,
+ server.repl_backlog->blocks_index);
+ resetReplicationBuffer();
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}
-/* Add data to the replication backlog.
- * This function also increments the global replication offset stored at
- * server.master_repl_offset, because there is no case where we want to feed
- * the backlog without incrementing the offset. */
-void feedReplicationBacklog(void *ptr, size_t len) {
- unsigned char *p = ptr;
+void resetReplicationBuffer(void) {
+ server.repl_buffer_mem = 0;
+ server.repl_buffer_blocks = listCreate();
+ listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void*))zfree);
+}
- server.master_repl_offset += len;
+int canFeedReplicaReplBuffer(client *replica) {
+ /* Don't feed replicas that only want the RDB. */
+ if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
- /* This is a circular buffer, so write as much data we can at every
- * iteration and rewind the "idx" index if we reach the limit. */
- while(len) {
- size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
- if (thislen > len) thislen = len;
- memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
- server.repl_backlog_idx += thislen;
- if (server.repl_backlog_idx == server.repl_backlog_size)
- server.repl_backlog_idx = 0;
- len -= thislen;
- p += thislen;
- server.repl_backlog_histlen += thislen;
- }
- if (server.repl_backlog_histlen > server.repl_backlog_size)
- server.repl_backlog_histlen = server.repl_backlog_size;
- /* Set the offset of the first byte we have in the backlog. */
- server.repl_backlog_off = server.master_repl_offset -
- server.repl_backlog_histlen + 1;
+ /* Don't feed replicas that are still waiting for BGSAVE to start. */
+ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
+
+ return 1;
}
-/* Wrapper for feedReplicationBacklog() that takes Redis string objects
+/* Similar with 'prepareClientToWrite', note that we must call this function
+ * before feeding replication stream into global replication buffer, since
+ * clientHasPendingReplies in prepareClientToWrite will access the global
+ * replication buffer to make judgements. */
+int prepareReplicasToWrite(void) {
+ listIter li;
+ listNode *ln;
+ int prepared = 0;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
+ if (prepareClientToWrite(slave) == C_ERR) continue;
+ prepared++;
+ }
+
+ return prepared;
+}
+
+/* Wrapper for feedReplicationBuffer() that takes Redis string objects
* as input. */
-void feedReplicationBacklogWithObject(robj *o) {
+void feedReplicationBufferWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;
@@ -197,27 +209,197 @@ void feedReplicationBacklogWithObject(robj *o) {
len = sdslen(o->ptr);
p = o->ptr;
}
- feedReplicationBacklog(p,len);
+ feedReplicationBuffer(p,len);
}
-int canFeedReplicaReplBuffer(client *replica) {
- /* Don't feed replicas that only want the RDB. */
- if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
+/* Generally, we only have one replication buffer block to trim when replication
+ * backlog size exceeds our setting and no replica reference it. But if replica
+ * clients disconnect, we need to free many replication buffer blocks that are
+ * referenced. It would cost much time if there are a lots blocks to free, that
+ * will freeze server, so we trim replication backlog incrementally. */
+void incrementalTrimReplicationBacklog(size_t max_blocks) {
+ serverAssert(server.repl_backlog != NULL);
- /* Don't feed replicas that are still waiting for BGSAVE to start. */
- if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
+ size_t trimmed_blocks = 0, trimmed_bytes = 0;
+ while (server.repl_backlog->histlen > server.repl_backlog_size &&
+ trimmed_blocks < max_blocks)
+ {
+ /* We never trim backlog to less than one block. */
+ if (listLength(server.repl_buffer_blocks) <= 1) break;
+
+ /* Replicas increment the refcount of the first replication buffer block
+ * they refer to, in that case, we don't trim the backlog even if
+ * backlog_histlen exceeds backlog_size. This implicitly makes backlog
+ * bigger than our setting, but makes the master accept partial resync as
+ * much as possible. So that backlog must be the last reference of
+ * replication buffer blocks. */
+ listNode *first = listFirst(server.repl_buffer_blocks);
+ serverAssert(first == server.repl_backlog->ref_repl_buf_node);
+ replBufBlock *fo = listNodeValue(first);
+ if (fo->refcount != 1) break;
+
+ /* We don't try trim backlog if backlog valid size will be lessen than
+ * setting backlog size once we release the first repl buffer block. */
+ if (server.repl_backlog->histlen - (long long)fo->size <=
+ server.repl_backlog_size) break;
+
+ /* Decr refcount and release the first block later. */
+ fo->refcount--;
+ trimmed_bytes += fo->size;
+ trimmed_blocks++;
+
+ /* Go to use next replication buffer block node. */
+ listNode *next = listNextNode(first);
+ server.repl_backlog->ref_repl_buf_node = next;
+ serverAssert(server.repl_backlog->ref_repl_buf_node != NULL);
+ /* Incr reference count to keep the new head node. */
+ ((replBufBlock *)listNodeValue(next))->refcount++;
+
+ /* Remove the node in recorded blocks. */
+ uint64_t encoded_offset = htonu64(fo->repl_offset);
+ raxRemove(server.repl_backlog->blocks_index,
+ (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL);
+
+ /* Delete the first node from global replication buffer. */
+ serverAssert(fo->refcount == 0 && fo->used == fo->size);
+ server.repl_buffer_mem -= (fo->size +
+ sizeof(listNode) + sizeof(replBufBlock));
+ listDelNode(server.repl_buffer_blocks, first);
+ }
+
+ server.repl_backlog->histlen -= trimmed_bytes;
+ /* Set the offset of the first byte we have in the backlog. */
+ server.repl_backlog->offset = server.master_repl_offset -
+ server.repl_backlog->histlen + 1;
+}
- return 1;
+/* Free replication buffer blocks that are referenced by this client. */
+void freeReplicaReferencedReplBuffer(client *replica) {
+ if (replica->ref_repl_buf_node != NULL) {
+ /* Decrease the start buffer node reference count. */
+ replBufBlock *o = listNodeValue(replica->ref_repl_buf_node);
+ serverAssert(o->refcount > 0);
+ o->refcount--;
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
+ }
+ replica->ref_repl_buf_node = NULL;
+ replica->ref_block_pos = 0;
}
-/* Propagate write commands to slaves, and populate the replication backlog
- * as well. This function is used if the instance is a master: we use
- * the commands received by our clients in order to create the replication
- * stream. Instead if the instance is a slave and has sub-slaves attached,
- * we use replicationFeedSlavesFromMasterStream() */
-void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
- listNode *ln;
+/* Append bytes into the global replication buffer list, replication backlog and
+ * all replica clients use replication buffers collectively, this function replace
+ * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog,
+ * First we add buffer into global replication buffer block list, and then
+ * update replica / replication-backlog referenced node and block position. */
+void feedReplicationBuffer(char *s, size_t len) {
+ static long long repl_block_id = 0;
+
+ if (server.repl_backlog == NULL) return;
+ server.master_repl_offset += len;
+ server.repl_backlog->histlen += len;
+
+ /* Install write handler for all replicas. */
+ prepareReplicasToWrite();
+
+ size_t start_pos = 0; /* The position of referenced blok to start sending. */
+ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
+ int add_new_block = 0; /* Create new block if current block is total used. */
+ listNode *ln = listLast(server.repl_buffer_blocks);
+ replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
+
+ /* Append to tail string when possible. */
+ if (tail && tail->size > tail->used) {
+ start_node = listLast(server.repl_buffer_blocks);
+ start_pos = tail->used;
+ /* Copy the part we can fit into the tail, and leave the rest for a
+ * new node */
+ size_t avail = tail->size - tail->used;
+ size_t copy = (avail >= len) ? len : avail;
+ memcpy(tail->buf + tail->used, s, copy);
+ tail->used += copy;
+ s += copy;
+ len -= copy;
+ }
+ if (len) {
+ /* Create a new node, make sure it is allocated to at
+ * least PROTO_REPLY_CHUNK_BYTES */
+ size_t usable_size;
+ size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len;
+ tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
+ /* Take over the allocation's internal fragmentation */
+ tail->size = usable_size - sizeof(replBufBlock);
+ tail->used = len;
+ tail->refcount = 0;
+ tail->repl_offset = server.master_repl_offset - tail->used + 1;
+ tail->id = repl_block_id++;
+ memcpy(tail->buf, s, len);
+ listAddNodeTail(server.repl_buffer_blocks, tail);
+ /* We also count the list node memory into replication buffer memory. */
+ server.repl_buffer_mem += (usable_size + sizeof(listNode));
+ add_new_block = 1;
+ if (start_node == NULL) {
+ start_node = listLast(server.repl_buffer_blocks);
+ start_pos = 0;
+ }
+ }
+
+ /* For output buffer of replicas. */
listIter li;
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+ if (!canFeedReplicaReplBuffer(slave)) continue;
+
+ /* Update shared replication buffer start position. */
+ if (slave->ref_repl_buf_node == NULL) {
+ slave->ref_repl_buf_node = start_node;
+ slave->ref_block_pos = start_pos;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
+ }
+
+ /* Check output buffer limit only when add new block. */
+ if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
+ }
+
+ /* For replication backlog */
+ if (server.repl_backlog->ref_repl_buf_node == NULL) {
+ server.repl_backlog->ref_repl_buf_node = start_node;
+ /* Only increase the start block reference count. */
+ ((replBufBlock *)listNodeValue(start_node))->refcount++;
+
+ /* Replication buffer must be empty before adding replication stream
+ * into replication backlog. */
+ serverAssert(add_new_block == 1 && start_pos == 0);
+ }
+ if (add_new_block) {
+ /* To make search offset from replication buffer blocks quickly
+ * when replicas ask partial resynchronization, we create one index
+ * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */
+ server.repl_backlog->unindexed_count++;
+ if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) {
+ uint64_t encoded_offset = htonu64(tail->repl_offset);
+ raxInsert(server.repl_backlog->blocks_index,
+ (unsigned char*)&encoded_offset, sizeof(uint64_t),
+ listLast(server.repl_buffer_blocks), NULL);
+ server.repl_backlog->unindexed_count = 0;
+ }
+ }
+ /* Try to trim replication backlog since replication backlog may exceed
+ * our setting when we add replication stream. Note that it is important to
+ * try to trim at least one node since in the common case this is where one
+ * new backlog node is added and one should be removed. See also comments
+ * in freeMemoryGetNotCountedMemory for details. */
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
+}
+
+/* Propagate write commands to replication stream.
+ *
+ * This function is used if the instance is a master: we use the commands
+ * received by our clients in order to create the replication stream.
+ * Instead if the instance is a replica and has sub-replicas attached, we use
+ * replicationFeedStreamFromMasterStream() */
+void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
int j, len;
char llstr[LONG_STR_SIZE];
@@ -252,68 +434,36 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
dictid_len, llstr));
}
- /* Add the SELECT command into the backlog. */
- if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
-
- /* Send it to slaves. */
- listRewind(slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
-
- if (!canFeedReplicaReplBuffer(slave)) continue;
- addReply(slave,selectcmd);
- }
+ feedReplicationBufferWithObject(selectcmd);
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
- /* Write the command to the replication backlog if any. */
- if (server.repl_backlog) {
- char aux[LONG_STR_SIZE+3];
-
- /* Add the multi bulk reply length. */
- aux[0] = '*';
- len = ll2string(aux+1,sizeof(aux)-1,argc);
- aux[len+1] = '\r';
- aux[len+2] = '\n';
- feedReplicationBacklog(aux,len+3);
-
- for (j = 0; j < argc; j++) {
- long objlen = stringObjectLen(argv[j]);
+ /* Write the command to the replication buffer if any. */
+ char aux[LONG_STR_SIZE+3];
- /* We need to feed the buffer with the object as a bulk reply
- * not just as a plain string, so create the $..CRLF payload len
- * and add the final CRLF */
- aux[0] = '$';
- len = ll2string(aux+1,sizeof(aux)-1,objlen);
- aux[len+1] = '\r';
- aux[len+2] = '\n';
- feedReplicationBacklog(aux,len+3);
- feedReplicationBacklogWithObject(argv[j]);
- feedReplicationBacklog(aux+len+1,2);
- }
- }
-
- /* Write the command to every slave. */
- listRewind(slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
+ /* Add the multi bulk reply length. */
+ aux[0] = '*';
+ len = ll2string(aux+1,sizeof(aux)-1,argc);
+ aux[len+1] = '\r';
+ aux[len+2] = '\n';
+ feedReplicationBuffer(aux,len+3);
- if (!canFeedReplicaReplBuffer(slave)) continue;
-
- /* Feed slaves that are waiting for the initial SYNC (so these commands
- * are queued in the output buffer until the initial SYNC completes),
- * or are already in sync with the master. */
-
- /* Add the multi bulk length. */
- addReplyArrayLen(slave,argc);
+ for (j = 0; j < argc; j++) {
+ long objlen = stringObjectLen(argv[j]);
- /* Finally any additional argument that was not stored inside the
- * static buffer if any (from j to argc). */
- for (j = 0; j < argc; j++)
- addReplyBulk(slave,argv[j]);
+ /* We need to feed the buffer with the object as a bulk reply
+ * not just as a plain string, so create the $..CRLF payload len
+ * and add the final CRLF */
+ aux[0] = '$';
+ len = ll2string(aux+1,sizeof(aux)-1,objlen);
+ aux[len+1] = '\r';
+ aux[len+2] = '\n';
+ feedReplicationBuffer(aux,len+3);
+ feedReplicationBufferWithObject(argv[j]);
+ feedReplicationBuffer(aux+len+1,2);
}
}
@@ -323,26 +473,24 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
* guess what kind of bug it could be. */
void showLatestBacklog(void) {
if (server.repl_backlog == NULL) return;
+ if (listLength(server.repl_buffer_blocks) == 0) return;
- long long dumplen = 256;
- if (server.repl_backlog_histlen < dumplen)
- dumplen = server.repl_backlog_histlen;
-
- /* Identify the first byte to dump. */
- long long idx =
- (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) %
- server.repl_backlog_size;
+ size_t dumplen = 256;
+ if (server.repl_backlog->histlen < (long long)dumplen)
+ dumplen = server.repl_backlog->histlen;
- /* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
+ listNode *node = listLast(server.repl_buffer_blocks);
while(dumplen) {
- long long thislen =
- ((server.repl_backlog_size - idx) < dumplen) ?
- (server.repl_backlog_size - idx) : dumplen;
-
- dump = sdscatrepr(dump,server.repl_backlog+idx,thislen);
+ if (node == NULL) break;
+ replBufBlock *o = listNodeValue(node);
+ size_t thislen = o->used >= dumplen ? dumplen : o->used;
+ sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen);
+ sds tmp = sdscatsds(head, dump);
+ sdsfree(dump);
+ dump = tmp;
dumplen -= thislen;
- idx = 0;
+ node = listPrevNode(node);
}
/* Finally log such bytes: this is vital debugging info to
@@ -354,10 +502,7 @@ void showLatestBacklog(void) {
/* This function is used in order to proxy what we receive from our master
* to our sub-slaves. */
#include <ctype.h>
-void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
- listNode *ln;
- listIter li;
-
+void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) {
/* Debugging: this is handy to see the stream sent from master
* to slaves. Disabled with if(0). */
if (0) {
@@ -368,14 +513,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
printf("\n");
}
- if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
- listRewind(slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
-
- if (!canFeedReplicaReplBuffer(slave)) continue;
- addReplyProto(slave,buf,buflen);
- }
+ /* There must be replication backlog if having attached slaves. */
+ if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL);
+ if (server.repl_backlog) feedReplicationBuffer(buf,buflen);
}
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
@@ -422,11 +562,11 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
/* Feed the slave 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */
long long addReplyReplicationBacklog(client *c, long long offset) {
- long long j, skip, len;
+ long long skip;
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
- if (server.repl_backlog_histlen == 0) {
+ if (server.repl_backlog->histlen == 0) {
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
return 0;
}
@@ -434,41 +574,58 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
server.repl_backlog_size);
serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
- server.repl_backlog_off);
+ server.repl_backlog->offset);
serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
- server.repl_backlog_histlen);
- serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
- server.repl_backlog_idx);
+ server.repl_backlog->histlen);
/* Compute the amount of bytes we need to discard. */
- skip = offset - server.repl_backlog_off;
+ skip = offset - server.repl_backlog->offset;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
- /* Point j to the oldest byte, that is actually our
- * server.repl_backlog_off byte. */
- j = (server.repl_backlog_idx +
- (server.repl_backlog_size-server.repl_backlog_histlen)) %
- server.repl_backlog_size;
- serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
+ /* Iterate recorded blocks, quickly search the approximate node. */
+ listNode *node = NULL;
+ if (raxSize(server.repl_backlog->blocks_index) > 0) {
+ uint64_t encoded_offset = htonu64(offset);
+ raxIterator ri;
+ raxStart(&ri, server.repl_backlog->blocks_index);
+ raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t));
+ if (raxEOF(&ri)) {
+ /* No found, so search from the last recorded node. */
+ raxSeek(&ri, "$", NULL, 0);
+ raxPrev(&ri);
+ node = (listNode *)ri.data;
+ } else {
+ raxPrev(&ri); /* Skip the sought node. */
+ /* We should search from the prev node since the offset of current
+ * sought node exceeds searching offset. */
+ if (raxPrev(&ri))
+ node = (listNode *)ri.data;
+ else
+ node = server.repl_backlog->ref_repl_buf_node;
+ }
+ raxStop(&ri);
+ } else {
+ /* No recorded blocks, just from the start node to search. */
+ node = server.repl_backlog->ref_repl_buf_node;
+ }
- /* Discard the amount of data to seek to the specified 'offset'. */
- j = (j + skip) % server.repl_backlog_size;
+ /* Search the exact node. */
+ while (node != NULL) {
+ replBufBlock *o = listNodeValue(node);
+ if (o->repl_offset + (long long)o->used >= offset) break;
+ node = listNextNode(node);
+ }
+ serverAssert(node != NULL);
- /* Feed slave with data. Since it is a circular buffer we have to
- * split the reply in two parts if we are cross-boundary. */
- len = server.repl_backlog_histlen - skip;
- serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
- while(len) {
- long long thislen =
- ((server.repl_backlog_size - j) < len) ?
- (server.repl_backlog_size - j) : len;
+ /* Install a writer handler first.*/
+ prepareClientToWrite(c);
+ /* Setting output buffer of the replica. */
+ replBufBlock *o = listNodeValue(node);
+ o->refcount++;
+ c->ref_repl_buf_node = node;
+ c->ref_block_pos = offset - o->repl_offset;
- serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
- addReplyProto(c,server.repl_backlog + j, thislen);
- len -= thislen;
- j = 0;
- }
- return server.repl_backlog_histlen - skip;
+ return server.repl_backlog->histlen - skip;
}
/* Return the offset to provide as reply to the PSYNC command received
@@ -569,8 +726,8 @@ int masterTryPartialResynchronization(client *c) {
/* We still have the data our slave is asking for? */
if (!server.repl_backlog ||
- psync_offset < server.repl_backlog_off ||
- psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
+ psync_offset < server.repl_backlog->offset ||
+ psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
@@ -853,7 +1010,8 @@ void syncCommand(client *c) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer.
* We don't copy buffer if clients don't want. */
- if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave);
+ if (!(c->flags & CLIENT_REPL_RDBONLY))
+ copyReplicaOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
@@ -3488,6 +3646,16 @@ void replicationCron(void) {
* with any persistence. */
removeRDBUsedToSyncReplicas();
+ /* Sanity check replication buffer, the first block of replication buffer blocks
+ * must be referenced by someone, since it will be freed when not referenced,
+ * otherwise, server will OOM. also, its refcount must not be more than
+ * replicas number + 1(replication backlog). */
+ if (listLength(server.repl_buffer_blocks) > 0) {
+ replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks));
+ serverAssert(o->refcount > 0 &&
+ o->refcount <= (int)listLength(server.slaves)+1);
+ }
+
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */