diff options
Diffstat (limited to 'src/replication.c')
-rw-r--r-- | src/replication.c | 520 |
1 files changed, 344 insertions, 176 deletions
diff --git a/src/replication.c b/src/replication.c index 1c8836c02..0629a4ca9 100644 --- a/src/replication.c +++ b/src/replication.c @@ -33,6 +33,7 @@ #include "cluster.h" #include "bio.h" +#include <memory.h> #include <sys/time.h> #include <unistd.h> #include <fcntl.h> @@ -109,83 +110,94 @@ int bg_unlink(const char *filename) { void createReplicationBacklog(void) { serverAssert(server.repl_backlog == NULL); - server.repl_backlog = zmalloc(server.cfg_repl_backlog_size); - server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog); - server.repl_backlog_histlen = 0; - server.repl_backlog_idx = 0; - + server.repl_backlog = zmalloc(sizeof(replBacklog)); + server.repl_backlog->ref_repl_buf_node = NULL; + server.repl_backlog->unindexed_count = 0; + server.repl_backlog->blocks_index = raxNew(); + server.repl_backlog->histlen = 0; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the * replication stream. */ - server.repl_backlog_off = server.master_repl_offset+1; + server.repl_backlog->offset = server.master_repl_offset+1; } /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the - * server.cfg_repl_backlog_size and to resize the buffer and setup it so that + * server.repl_backlog_size and to resize the buffer and setup it so that * it contains the same data as the previous one (possibly less data, but * the most recent bytes, or the same data and more free space in case the * buffer is enlarged). */ void resizeReplicationBacklog(long long newsize) { if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; - if (server.cfg_repl_backlog_size == newsize) return; + if (server.repl_backlog_size == newsize) return; - server.cfg_repl_backlog_size = newsize; - if (server.repl_backlog != NULL) { - /* What we actually do is to flush the old buffer and realloc a new - * empty one. It will refill with new data incrementally. - * The reason is that copying a few gigabytes adds latency and even - * worse often we need to alloc additional space before freeing the - * old buffer. */ - zfree(server.repl_backlog); - server.repl_backlog = zmalloc(server.cfg_repl_backlog_size); - server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog); - server.repl_backlog_histlen = 0; - server.repl_backlog_idx = 0; - /* Next byte we have is... the next since the buffer is empty. */ - server.repl_backlog_off = server.master_repl_offset+1; - } + server.repl_backlog_size = newsize; + if (server.repl_backlog) + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } void freeReplicationBacklog(void) { serverAssert(listLength(server.slaves) == 0); + if (server.repl_backlog == NULL) return; + + /* Decrease the start buffer node reference count. */ + if (server.repl_backlog->ref_repl_buf_node) { + replBufBlock *o = listNodeValue( + server.repl_backlog->ref_repl_buf_node); + serverAssert(o->refcount == 1); /* Last reference. */ + o->refcount--; + } + + /* Replication buffer blocks are completely released when we free the + * backlog, since the backlog is released only when there are no replicas + * and the backlog keeps the last reference of all blocks. */ + freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks, + server.repl_backlog->blocks_index); + resetReplicationBuffer(); zfree(server.repl_backlog); server.repl_backlog = NULL; } -/* Add data to the replication backlog. - * This function also increments the global replication offset stored at - * server.master_repl_offset, because there is no case where we want to feed - * the backlog without incrementing the offset. */ -void feedReplicationBacklog(void *ptr, size_t len) { - unsigned char *p = ptr; +void resetReplicationBuffer(void) { + server.repl_buffer_mem = 0; + server.repl_buffer_blocks = listCreate(); + listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void*))zfree); +} - server.master_repl_offset += len; +int canFeedReplicaReplBuffer(client *replica) { + /* Don't feed replicas that only want the RDB. */ + if (replica->flags & CLIENT_REPL_RDBONLY) return 0; - /* This is a circular buffer, so write as much data we can at every - * iteration and rewind the "idx" index if we reach the limit. */ - while(len) { - size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; - if (thislen > len) thislen = len; - memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); - server.repl_backlog_idx += thislen; - if (server.repl_backlog_idx == server.repl_backlog_size) - server.repl_backlog_idx = 0; - len -= thislen; - p += thislen; - server.repl_backlog_histlen += thislen; - } - if (server.repl_backlog_histlen > server.repl_backlog_size) - server.repl_backlog_histlen = server.repl_backlog_size; - /* Set the offset of the first byte we have in the backlog. */ - server.repl_backlog_off = server.master_repl_offset - - server.repl_backlog_histlen + 1; + /* Don't feed replicas that are still waiting for BGSAVE to start. */ + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; + + return 1; } -/* Wrapper for feedReplicationBacklog() that takes Redis string objects +/* Similar with 'prepareClientToWrite', note that we must call this function + * before feeding replication stream into global replication buffer, since + * clientHasPendingReplies in prepareClientToWrite will access the global + * replication buffer to make judgements. */ +int prepareReplicasToWrite(void) { + listIter li; + listNode *ln; + int prepared = 0; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; + if (prepareClientToWrite(slave) == C_ERR) continue; + prepared++; + } + + return prepared; +} + +/* Wrapper for feedReplicationBuffer() that takes Redis string objects * as input. */ -void feedReplicationBacklogWithObject(robj *o) { +void feedReplicationBufferWithObject(robj *o) { char llstr[LONG_STR_SIZE]; void *p; size_t len; @@ -197,27 +209,197 @@ void feedReplicationBacklogWithObject(robj *o) { len = sdslen(o->ptr); p = o->ptr; } - feedReplicationBacklog(p,len); + feedReplicationBuffer(p,len); } -int canFeedReplicaReplBuffer(client *replica) { - /* Don't feed replicas that only want the RDB. */ - if (replica->flags & CLIENT_REPL_RDBONLY) return 0; +/* Generally, we only have one replication buffer block to trim when replication + * backlog size exceeds our setting and no replica reference it. But if replica + * clients disconnect, we need to free many replication buffer blocks that are + * referenced. It would cost much time if there are a lots blocks to free, that + * will freeze server, so we trim replication backlog incrementally. */ +void incrementalTrimReplicationBacklog(size_t max_blocks) { + serverAssert(server.repl_backlog != NULL); - /* Don't feed replicas that are still waiting for BGSAVE to start. */ - if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; + size_t trimmed_blocks = 0, trimmed_bytes = 0; + while (server.repl_backlog->histlen > server.repl_backlog_size && + trimmed_blocks < max_blocks) + { + /* We never trim backlog to less than one block. */ + if (listLength(server.repl_buffer_blocks) <= 1) break; + + /* Replicas increment the refcount of the first replication buffer block + * they refer to, in that case, we don't trim the backlog even if + * backlog_histlen exceeds backlog_size. This implicitly makes backlog + * bigger than our setting, but makes the master accept partial resync as + * much as possible. So that backlog must be the last reference of + * replication buffer blocks. */ + listNode *first = listFirst(server.repl_buffer_blocks); + serverAssert(first == server.repl_backlog->ref_repl_buf_node); + replBufBlock *fo = listNodeValue(first); + if (fo->refcount != 1) break; + + /* We don't try trim backlog if backlog valid size will be lessen than + * setting backlog size once we release the first repl buffer block. */ + if (server.repl_backlog->histlen - (long long)fo->size <= + server.repl_backlog_size) break; + + /* Decr refcount and release the first block later. */ + fo->refcount--; + trimmed_bytes += fo->size; + trimmed_blocks++; + + /* Go to use next replication buffer block node. */ + listNode *next = listNextNode(first); + server.repl_backlog->ref_repl_buf_node = next; + serverAssert(server.repl_backlog->ref_repl_buf_node != NULL); + /* Incr reference count to keep the new head node. */ + ((replBufBlock *)listNodeValue(next))->refcount++; + + /* Remove the node in recorded blocks. */ + uint64_t encoded_offset = htonu64(fo->repl_offset); + raxRemove(server.repl_backlog->blocks_index, + (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL); + + /* Delete the first node from global replication buffer. */ + serverAssert(fo->refcount == 0 && fo->used == fo->size); + server.repl_buffer_mem -= (fo->size + + sizeof(listNode) + sizeof(replBufBlock)); + listDelNode(server.repl_buffer_blocks, first); + } + + server.repl_backlog->histlen -= trimmed_bytes; + /* Set the offset of the first byte we have in the backlog. */ + server.repl_backlog->offset = server.master_repl_offset - + server.repl_backlog->histlen + 1; +} - return 1; +/* Free replication buffer blocks that are referenced by this client. */ +void freeReplicaReferencedReplBuffer(client *replica) { + if (replica->ref_repl_buf_node != NULL) { + /* Decrease the start buffer node reference count. */ + replBufBlock *o = listNodeValue(replica->ref_repl_buf_node); + serverAssert(o->refcount > 0); + o->refcount--; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } + replica->ref_repl_buf_node = NULL; + replica->ref_block_pos = 0; } -/* Propagate write commands to slaves, and populate the replication backlog - * as well. This function is used if the instance is a master: we use - * the commands received by our clients in order to create the replication - * stream. Instead if the instance is a slave and has sub-slaves attached, - * we use replicationFeedSlavesFromMasterStream() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { - listNode *ln; +/* Append bytes into the global replication buffer list, replication backlog and + * all replica clients use replication buffers collectively, this function replace + * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, + * First we add buffer into global replication buffer block list, and then + * update replica / replication-backlog referenced node and block position. */ +void feedReplicationBuffer(char *s, size_t len) { + static long long repl_block_id = 0; + + if (server.repl_backlog == NULL) return; + server.master_repl_offset += len; + server.repl_backlog->histlen += len; + + /* Install write handler for all replicas. */ + prepareReplicasToWrite(); + + size_t start_pos = 0; /* The position of referenced blok to start sending. */ + listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ + int add_new_block = 0; /* Create new block if current block is total used. */ + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + + /* Append to tail string when possible. */ + if (tail && tail->size > tail->used) { + start_node = listLast(server.repl_buffer_blocks); + start_pos = tail->used; + /* Copy the part we can fit into the tail, and leave the rest for a + * new node */ + size_t avail = tail->size - tail->used; + size_t copy = (avail >= len) ? len : avail; + memcpy(tail->buf + tail->used, s, copy); + tail->used += copy; + s += copy; + len -= copy; + } + if (len) { + /* Create a new node, make sure it is allocated to at + * least PROTO_REPLY_CHUNK_BYTES */ + size_t usable_size; + size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len; + tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); + /* Take over the allocation's internal fragmentation */ + tail->size = usable_size - sizeof(replBufBlock); + tail->used = len; + tail->refcount = 0; + tail->repl_offset = server.master_repl_offset - tail->used + 1; + tail->id = repl_block_id++; + memcpy(tail->buf, s, len); + listAddNodeTail(server.repl_buffer_blocks, tail); + /* We also count the list node memory into replication buffer memory. */ + server.repl_buffer_mem += (usable_size + sizeof(listNode)); + add_new_block = 1; + if (start_node == NULL) { + start_node = listLast(server.repl_buffer_blocks); + start_pos = 0; + } + } + + /* For output buffer of replicas. */ listIter li; + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; + + /* Update shared replication buffer start position. */ + if (slave->ref_repl_buf_node == NULL) { + slave->ref_repl_buf_node = start_node; + slave->ref_block_pos = start_pos; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(start_node))->refcount++; + } + + /* Check output buffer limit only when add new block. */ + if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); + } + + /* For replication backlog */ + if (server.repl_backlog->ref_repl_buf_node == NULL) { + server.repl_backlog->ref_repl_buf_node = start_node; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(start_node))->refcount++; + + /* Replication buffer must be empty before adding replication stream + * into replication backlog. */ + serverAssert(add_new_block == 1 && start_pos == 0); + } + if (add_new_block) { + /* To make search offset from replication buffer blocks quickly + * when replicas ask partial resynchronization, we create one index + * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ + server.repl_backlog->unindexed_count++; + if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { + uint64_t encoded_offset = htonu64(tail->repl_offset); + raxInsert(server.repl_backlog->blocks_index, + (unsigned char*)&encoded_offset, sizeof(uint64_t), + listLast(server.repl_buffer_blocks), NULL); + server.repl_backlog->unindexed_count = 0; + } + } + /* Try to trim replication backlog since replication backlog may exceed + * our setting when we add replication stream. Note that it is important to + * try to trim at least one node since in the common case this is where one + * new backlog node is added and one should be removed. See also comments + * in freeMemoryGetNotCountedMemory for details. */ + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); +} + +/* Propagate write commands to replication stream. + * + * This function is used if the instance is a master: we use the commands + * received by our clients in order to create the replication stream. + * Instead if the instance is a replica and has sub-replicas attached, we use + * replicationFeedStreamFromMasterStream() */ +void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { int j, len; char llstr[LONG_STR_SIZE]; @@ -252,68 +434,36 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { dictid_len, llstr)); } - /* Add the SELECT command into the backlog. */ - if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); - - /* Send it to slaves. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (!canFeedReplicaReplBuffer(slave)) continue; - addReply(slave,selectcmd); - } + feedReplicationBufferWithObject(selectcmd); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } server.slaveseldb = dictid; - /* Write the command to the replication backlog if any. */ - if (server.repl_backlog) { - char aux[LONG_STR_SIZE+3]; - - /* Add the multi bulk reply length. */ - aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); - - for (j = 0; j < argc; j++) { - long objlen = stringObjectLen(argv[j]); + /* Write the command to the replication buffer if any. */ + char aux[LONG_STR_SIZE+3]; - /* We need to feed the buffer with the object as a bulk reply - * not just as a plain string, so create the $..CRLF payload len - * and add the final CRLF */ - aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); - feedReplicationBacklogWithObject(argv[j]); - feedReplicationBacklog(aux+len+1,2); - } - } - - /* Write the command to every slave. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; + /* Add the multi bulk reply length. */ + aux[0] = '*'; + len = ll2string(aux+1,sizeof(aux)-1,argc); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBuffer(aux,len+3); - if (!canFeedReplicaReplBuffer(slave)) continue; - - /* Feed slaves that are waiting for the initial SYNC (so these commands - * are queued in the output buffer until the initial SYNC completes), - * or are already in sync with the master. */ - - /* Add the multi bulk length. */ - addReplyArrayLen(slave,argc); + for (j = 0; j < argc; j++) { + long objlen = stringObjectLen(argv[j]); - /* Finally any additional argument that was not stored inside the - * static buffer if any (from j to argc). */ - for (j = 0; j < argc; j++) - addReplyBulk(slave,argv[j]); + /* We need to feed the buffer with the object as a bulk reply + * not just as a plain string, so create the $..CRLF payload len + * and add the final CRLF */ + aux[0] = '$'; + len = ll2string(aux+1,sizeof(aux)-1,objlen); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBuffer(aux,len+3); + feedReplicationBufferWithObject(argv[j]); + feedReplicationBuffer(aux+len+1,2); } } @@ -323,26 +473,24 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * guess what kind of bug it could be. */ void showLatestBacklog(void) { if (server.repl_backlog == NULL) return; + if (listLength(server.repl_buffer_blocks) == 0) return; - long long dumplen = 256; - if (server.repl_backlog_histlen < dumplen) - dumplen = server.repl_backlog_histlen; - - /* Identify the first byte to dump. */ - long long idx = - (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) % - server.repl_backlog_size; + size_t dumplen = 256; + if (server.repl_backlog->histlen < (long long)dumplen) + dumplen = server.repl_backlog->histlen; - /* Scan the circular buffer to collect 'dumplen' bytes. */ sds dump = sdsempty(); + listNode *node = listLast(server.repl_buffer_blocks); while(dumplen) { - long long thislen = - ((server.repl_backlog_size - idx) < dumplen) ? - (server.repl_backlog_size - idx) : dumplen; - - dump = sdscatrepr(dump,server.repl_backlog+idx,thislen); + if (node == NULL) break; + replBufBlock *o = listNodeValue(node); + size_t thislen = o->used >= dumplen ? dumplen : o->used; + sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen); + sds tmp = sdscatsds(head, dump); + sdsfree(dump); + dump = tmp; dumplen -= thislen; - idx = 0; + node = listPrevNode(node); } /* Finally log such bytes: this is vital debugging info to @@ -354,10 +502,7 @@ void showLatestBacklog(void) { /* This function is used in order to proxy what we receive from our master * to our sub-slaves. */ #include <ctype.h> -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { - listNode *ln; - listIter li; - +void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) { /* Debugging: this is handy to see the stream sent from master * to slaves. Disabled with if(0). */ if (0) { @@ -368,14 +513,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle printf("\n"); } - if (server.repl_backlog) feedReplicationBacklog(buf,buflen); - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (!canFeedReplicaReplBuffer(slave)) continue; - addReplyProto(slave,buf,buflen); - } + /* There must be replication backlog if having attached slaves. */ + if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL); + if (server.repl_backlog) feedReplicationBuffer(buf,buflen); } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { @@ -422,11 +562,11 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, /* Feed the slave 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { - long long j, skip, len; + long long skip; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); - if (server.repl_backlog_histlen == 0) { + if (server.repl_backlog->histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; } @@ -434,41 +574,58 @@ long long addReplyReplicationBacklog(client *c, long long offset) { serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size); serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", - server.repl_backlog_off); + server.repl_backlog->offset); serverLog(LL_DEBUG, "[PSYNC] History len: %lld", - server.repl_backlog_histlen); - serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", - server.repl_backlog_idx); + server.repl_backlog->histlen); /* Compute the amount of bytes we need to discard. */ - skip = offset - server.repl_backlog_off; + skip = offset - server.repl_backlog->offset; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - /* Point j to the oldest byte, that is actually our - * server.repl_backlog_off byte. */ - j = (server.repl_backlog_idx + - (server.repl_backlog_size-server.repl_backlog_histlen)) % - server.repl_backlog_size; - serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); + /* Iterate recorded blocks, quickly search the approximate node. */ + listNode *node = NULL; + if (raxSize(server.repl_backlog->blocks_index) > 0) { + uint64_t encoded_offset = htonu64(offset); + raxIterator ri; + raxStart(&ri, server.repl_backlog->blocks_index); + raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t)); + if (raxEOF(&ri)) { + /* No found, so search from the last recorded node. */ + raxSeek(&ri, "$", NULL, 0); + raxPrev(&ri); + node = (listNode *)ri.data; + } else { + raxPrev(&ri); /* Skip the sought node. */ + /* We should search from the prev node since the offset of current + * sought node exceeds searching offset. */ + if (raxPrev(&ri)) + node = (listNode *)ri.data; + else + node = server.repl_backlog->ref_repl_buf_node; + } + raxStop(&ri); + } else { + /* No recorded blocks, just from the start node to search. */ + node = server.repl_backlog->ref_repl_buf_node; + } - /* Discard the amount of data to seek to the specified 'offset'. */ - j = (j + skip) % server.repl_backlog_size; + /* Search the exact node. */ + while (node != NULL) { + replBufBlock *o = listNodeValue(node); + if (o->repl_offset + (long long)o->used >= offset) break; + node = listNextNode(node); + } + serverAssert(node != NULL); - /* Feed slave with data. Since it is a circular buffer we have to - * split the reply in two parts if we are cross-boundary. */ - len = server.repl_backlog_histlen - skip; - serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); - while(len) { - long long thislen = - ((server.repl_backlog_size - j) < len) ? - (server.repl_backlog_size - j) : len; + /* Install a writer handler first.*/ + prepareClientToWrite(c); + /* Setting output buffer of the replica. */ + replBufBlock *o = listNodeValue(node); + o->refcount++; + c->ref_repl_buf_node = node; + c->ref_block_pos = offset - o->repl_offset; - serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); - addReplyProto(c,server.repl_backlog + j, thislen); - len -= thislen; - j = 0; - } - return server.repl_backlog_histlen - skip; + return server.repl_backlog->histlen - skip; } /* Return the offset to provide as reply to the PSYNC command received @@ -569,8 +726,8 @@ int masterTryPartialResynchronization(client *c) { /* We still have the data our slave is asking for? */ if (!server.repl_backlog || - psync_offset < server.repl_backlog_off || - psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) + psync_offset < server.repl_backlog->offset || + psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen)) { serverLog(LL_NOTICE, "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); @@ -853,7 +1010,8 @@ void syncCommand(client *c) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. * We don't copy buffer if clients don't want. */ - if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave); + if (!(c->flags & CLIENT_REPL_RDBONLY)) + copyReplicaOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { @@ -3488,6 +3646,16 @@ void replicationCron(void) { * with any persistence. */ removeRDBUsedToSyncReplicas(); + /* Sanity check replication buffer, the first block of replication buffer blocks + * must be referenced by someone, since it will be freed when not referenced, + * otherwise, server will OOM. also, its refcount must not be more than + * replicas number + 1(replication backlog). */ + if (listLength(server.repl_buffer_blocks) > 0) { + replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks)); + serverAssert(o->refcount > 0 && + o->refcount <= (int)listLength(server.slaves)+1); + } + /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */ |