/* Asynchronous replication implementation. * * Copyright (c) 2009-2012, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "server.h" #include "cluster.h" #include "bio.h" #include #include #include #include #include void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(connection *conn); void replicationSendAck(void); void putSlaveOnline(client *slave); int cancelReplicationHandshake(int reconnect); /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case * the instance is configured to have no persistence. */ int RDBGeneratedByReplication = 0; /* --------------------------- Utility functions ---------------------------- */ /* Return the pointer to a string representing the slave ip:listening_port * pair. Mostly useful for logging, since we want to log a slave using its * IP address and its listening port which is more clear for the user, for * example: "Closing connection with replica 10.1.2.3:6380". */ char *replicationGetSlaveName(client *c) { static char buf[NET_HOST_PORT_STR_LEN]; char ip[NET_IP_STR_LEN]; ip[0] = '\0'; buf[0] = '\0'; if (c->slave_addr || connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1) { char *addr = c->slave_addr ? c->slave_addr : ip; if (c->slave_listening_port) anetFormatAddr(buf,sizeof(buf),addr,c->slave_listening_port); else snprintf(buf,sizeof(buf),"%s:",addr); } else { snprintf(buf,sizeof(buf),"client id #%llu", (unsigned long long) c->id); } return buf; } /* Plain unlink() can block for quite some time in order to actually apply * the file deletion to the filesystem. This call removes the file in a * background thread instead. We actually just do close() in the thread, * by using the fact that if there is another instance of the same file open, * the foreground unlink() will only remove the fs name, and deleting the * file's storage space will only happen once the last reference is lost. */ int bg_unlink(const char *filename) { int fd = open(filename,O_RDONLY|O_NONBLOCK); if (fd == -1) { /* Can't open the file? Fall back to unlinking in the main thread. */ return unlink(filename); } else { /* The following unlink() removes the name but doesn't free the * file contents because a process still has it open. */ int retval = unlink(filename); if (retval == -1) { /* If we got an unlink error, we just return it, closing the * new reference we have to the file. */ int old_errno = errno; close(fd); /* This would overwrite our errno. So we saved it. */ errno = old_errno; return -1; } bioCreateCloseJob(fd); return 0; /* Success. */ } } /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { serverAssert(server.repl_backlog == NULL); server.repl_backlog = zmalloc(server.cfg_repl_backlog_size); server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog); server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the * replication stream. */ server.repl_backlog_off = server.master_repl_offset+1; } /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the * server.cfg_repl_backlog_size and to resize the buffer and setup it so that * it contains the same data as the previous one (possibly less data, but * the most recent bytes, or the same data and more free space in case the * buffer is enlarged). */ void resizeReplicationBacklog(long long newsize) { if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (server.cfg_repl_backlog_size == newsize) return; server.cfg_repl_backlog_size = newsize; if (server.repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. * The reason is that copying a few gigabytes adds latency and even * worse often we need to alloc additional space before freeing the * old buffer. */ zfree(server.repl_backlog); server.repl_backlog = zmalloc(server.cfg_repl_backlog_size); server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog); server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; /* Next byte we have is... the next since the buffer is empty. */ server.repl_backlog_off = server.master_repl_offset+1; } } void freeReplicationBacklog(void) { serverAssert(listLength(server.slaves) == 0); zfree(server.repl_backlog); server.repl_backlog = NULL; } /* Add data to the replication backlog. * This function also increments the global replication offset stored at * server.master_repl_offset, because there is no case where we want to feed * the backlog without incrementing the offset. */ void feedReplicationBacklog(void *ptr, size_t len) { unsigned char *p = ptr; server.master_repl_offset += len; /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ while(len) { size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; if (thislen > len) thislen = len; memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); server.repl_backlog_idx += thislen; if (server.repl_backlog_idx == server.repl_backlog_size) server.repl_backlog_idx = 0; len -= thislen; p += thislen; server.repl_backlog_histlen += thislen; } if (server.repl_backlog_histlen > server.repl_backlog_size) server.repl_backlog_histlen = server.repl_backlog_size; /* Set the offset of the first byte we have in the backlog. */ server.repl_backlog_off = server.master_repl_offset - server.repl_backlog_histlen + 1; } /* Wrapper for feedReplicationBacklog() that takes Redis string objects * as input. */ void feedReplicationBacklogWithObject(robj *o) { char llstr[LONG_STR_SIZE]; void *p; size_t len; if (o->encoding == OBJ_ENCODING_INT) { len = ll2string(llstr,sizeof(llstr),(long)o->ptr); p = llstr; } else { len = sdslen(o->ptr); p = o->ptr; } feedReplicationBacklog(p,len); } int canFeedReplicaReplBuffer(client *replica) { /* Don't feed replicas that only want the RDB. */ if (replica->flags & CLIENT_REPL_RDBONLY) return 0; /* Don't feed replicas that are still waiting for BGSAVE to start. */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; return 1; } /* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication * stream. Instead if the instance is a slave and has sub-slaves attached, * we use replicationFeedSlavesFromMasterStream() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[LONG_STR_SIZE]; /* If the instance is not a top level master, return ASAP: we'll just proxy * the stream of data we receive from our master instead, in order to * propagate *identical* replication stream. In this way this slave can * advertise the same replication ID as the master (since it shares the * master replication history and has the same backlog and offsets). */ if (server.masterhost != NULL) return; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) return; /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); /* Send SELECT command to every slave if needed. */ if (server.slaveseldb != dictid) { robj *selectcmd; /* For a few DBs we have pre-computed SELECT command. */ if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; } else { int dictid_len; dictid_len = ll2string(llstr,sizeof(llstr),dictid); selectcmd = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, llstr)); } /* Add the SELECT command into the backlog. */ if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); /* Send it to slaves. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (!canFeedReplicaReplBuffer(slave)) continue; addReply(slave,selectcmd); } if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } server.slaveseldb = dictid; /* Write the command to the replication backlog if any. */ if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } /* Write the command to every slave. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (!canFeedReplicaReplBuffer(slave)) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ /* Add the multi bulk length. */ addReplyArrayLen(slave,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } } /* This is a debugging function that gets called when we detect something * wrong with the replication protocol: the goal is to peek into the * replication backlog and show a few final bytes to make simpler to * guess what kind of bug it could be. */ void showLatestBacklog(void) { if (server.repl_backlog == NULL) return; long long dumplen = 256; if (server.repl_backlog_histlen < dumplen) dumplen = server.repl_backlog_histlen; /* Identify the first byte to dump. */ long long idx = (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) % server.repl_backlog_size; /* Scan the circular buffer to collect 'dumplen' bytes. */ sds dump = sdsempty(); while(dumplen) { long long thislen = ((server.repl_backlog_size - idx) < dumplen) ? (server.repl_backlog_size - idx) : dumplen; dump = sdscatrepr(dump,server.repl_backlog+idx,thislen); dumplen -= thislen; idx = 0; } /* Finally log such bytes: this is vital debugging info to * understand what happened. */ serverLog(LL_WARNING,"Latest backlog is: '%s'", dump); sdsfree(dump); } /* This function is used in order to proxy what we receive from our master * to our sub-slaves. */ #include void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { listNode *ln; listIter li; /* Debugging: this is handy to see the stream sent from master * to slaves. Disabled with if(0). */ if (0) { printf("%zu:",buflen); for (size_t j = 0; j < buflen; j++) { printf("%c", isprint(buf[j]) ? buf[j] : '.'); } printf("\n"); } if (server.repl_backlog) feedReplicationBacklog(buf,buflen); listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (!canFeedReplicaReplBuffer(slave)) continue; addReplyProto(slave,buf,buflen); } } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { if (!(listLength(server.monitors) && !server.loading)) return; listNode *ln; listIter li; int j; sds cmdrepr = sdsnew("+"); robj *cmdobj; struct timeval tv; gettimeofday(&tv,NULL); cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); if (c->flags & CLIENT_LUA) { cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); } else if (c->flags & CLIENT_UNIX_SOCKET) { cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); } else { cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); } for (j = 0; j < argc; j++) { if (argv[j]->encoding == OBJ_ENCODING_INT) { cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); } else { cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, sdslen(argv[j]->ptr)); } if (j != argc-1) cmdrepr = sdscatlen(cmdrepr," ",1); } cmdrepr = sdscatlen(cmdrepr,"\r\n",2); cmdobj = createObject(OBJ_STRING,cmdrepr); listRewind(monitors,&li); while((ln = listNext(&li))) { client *monitor = ln->value; addReply(monitor,cmdobj); } decrRefCount(cmdobj); } /* Feed the slave 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { long long j, skip, len; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); if (server.repl_backlog_histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; } serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size); serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", server.repl_backlog_off); serverLog(LL_DEBUG, "[PSYNC] History len: %lld", server.repl_backlog_histlen); serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", server.repl_backlog_idx); /* Compute the amount of bytes we need to discard. */ skip = offset - server.repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); /* Point j to the oldest byte, that is actually our * server.repl_backlog_off byte. */ j = (server.repl_backlog_idx + (server.repl_backlog_size-server.repl_backlog_histlen)) % server.repl_backlog_size; serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); /* Discard the amount of data to seek to the specified 'offset'. */ j = (j + skip) % server.repl_backlog_size; /* Feed slave with data. Since it is a circular buffer we have to * split the reply in two parts if we are cross-boundary. */ len = server.repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); while(len) { long long thislen = ((server.repl_backlog_size - j) < len) ? (server.repl_backlog_size - j) : len; serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); addReplyProto(c,server.repl_backlog + j, thislen); len -= thislen; j = 0; } return server.repl_backlog_histlen - skip; } /* Return the offset to provide as reply to the PSYNC command received * from the slave. The returned value is only valid immediately after * the BGSAVE process started and before executing any other command * from clients. */ long long getPsyncInitialOffset(void) { return server.master_repl_offset; } /* Send a FULLRESYNC reply in the specific case of a full resynchronization, * as a side effect setup the slave for a full sync in different ways: * * 1) Remember, into the slave client structure, the replication offset * we sent here, so that if new slaves will later attach to the same * background RDB saving process (by duplicating this client output * buffer), we can get the right offset from this slave. * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that * we start accumulating differences from this point. * 3) Force the replication stream to re-emit a SELECT statement so * the new slave incremental differences will start selecting the * right database number. * * Normally this function should be called immediately after a successful * BGSAVE for replication was started, or when there is one already in * progress that we attached our slave to. */ int replicationSetupSlaveForFullResync(client *slave, long long offset) { char buf[128]; int buflen; slave->psync_initial_offset = offset; slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; /* We are going to accumulate the incremental changes for this * slave as well. Set slaveseldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ server.slaveseldb = -1; /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(slave->flags & CLIENT_PRE_PSYNC)) { buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", server.replid,offset); if (connWrite(slave->conn,buf,buflen) != buflen) { freeClientAsync(slave); return C_ERR; } } return C_OK; } /* This function handles the PSYNC command from the point of view of a * master receiving a request for partial resynchronization. * * On success return C_OK, otherwise C_ERR is returned and we proceed * with the usual full resync. */ int masterTryPartialResynchronization(client *c) { long long psync_offset, psync_len; char *master_replid = c->argv[1]->ptr; char buf[128]; int buflen; /* Parse the replication offset asked by the slave. Go to full sync * on parse error: this should never happen but we try to handle * it in a robust way compared to aborting. */ if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != C_OK) goto need_full_resync; /* Is the replication ID of this master the same advertised by the wannabe * slave via PSYNC? If the replication ID changed this master has a * different replication history, and there is no way to continue. * * Note that there are two potentially valid replication IDs: the ID1 * and the ID2. The ID2 however is only valid up to a specific offset. */ if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) || psync_offset > server.second_replid_offset)) { /* Replid "?" is used by slaves that want to force a full resync. */ if (master_replid[0] != '?') { if (strcasecmp(master_replid, server.replid) && strcasecmp(master_replid, server.replid2)) { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Replication ID mismatch (Replica asked for '%s', my " "replication IDs are '%s' and '%s')", master_replid, server.replid, server.replid2); } else { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Requested offset for second ID was %lld, but I can reply " "up to %lld", psync_offset, server.second_replid_offset); } } else { serverLog(LL_NOTICE,"Full resync requested by replica %s", replicationGetSlaveName(c)); } goto need_full_resync; } /* We still have the data our slave is asking for? */ if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { serverLog(LL_NOTICE, "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); if (psync_offset > server.master_repl_offset) { serverLog(LL_WARNING, "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); } goto need_full_resync; } /* If we reached this point, we are able to perform a partial resync: * 1) Set client state to make it a slave. * 2) Inform the client we can continue with +CONTINUE * 3) Send the backlog data (from the offset to the end) to the slave. */ c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ if (c->slave_capa & SLAVE_CAPA_PSYNC2) { buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); } else { buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); } if (connWrite(c->conn,buf,buflen) != buflen) { freeClientAsync(c); return C_OK; } psync_len = addReplyReplicationBacklog(c,psync_offset); serverLog(LL_NOTICE, "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", replicationGetSlaveName(c), psync_len, psync_offset); /* Note that we don't need to set the selected DB at server.slaveseldb * to -1 to force the master to emit SELECT, since the slave already * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); /* Fire the replica change modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, NULL); return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: /* We need a full resync for some reason... Note that we can't * reply to PSYNC right now if a full SYNC is needed. The reply * must include the master offset at the time the RDB file we transfer * is generated, so we need to delay the reply to that moment. */ return C_ERR; } /* Start a BGSAVE for replication goals, which is, selecting the disk or * socket target depending on the configuration, and making sure that * the script cache is flushed before to start. * * The mincapa argument is the bitwise AND among all the slaves capabilities * of the slaves waiting for this BGSAVE, so represents the slave capabilities * all the slaves support. Can be tested via SLAVE_CAPA_* macros. * * Side effects, other than starting a BGSAVE: * * 1) Handle the slaves in WAIT_START state, by preparing them for a full * sync if the BGSAVE was successfully started, or sending them an error * and dropping them from the list of slaves. * * 2) Flush the Lua scripting script cache if the BGSAVE was actually * started. * * Returns C_OK on success or C_ERR otherwise. */ int startBgsaveForReplication(int mincapa) { int retval; int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); listIter li; listNode *ln; serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk"); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); /* Only do rdbSave* when rsiptr is not NULL, * otherwise slave will miss repl-stream-db. */ if (rsiptr) { if (socket_target) retval = rdbSaveToSlavesSockets(rsiptr); else retval = rdbSaveBackground(server.rdb_filename,rsiptr); } else { serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); retval = C_ERR; } /* If we succeeded to start a BGSAVE with disk target, let's remember * this fact, so that we can later delete the file if needed. Note * that we don't set the flag to 1 if the feature is disabled, otherwise * it would never be cleared: the file is not deleted. This way if * the user enables it later with CONFIG SET, we are fine. */ if (retval == C_OK && !socket_target && server.rdb_del_sync_files) RDBGeneratedByReplication = 1; /* If we failed to BGSAVE, remove the slaves waiting for a full * resynchronization from the list of slaves, inform them with * an error about what happened, close the connection ASAP. */ if (retval == C_ERR) { serverLog(LL_WARNING,"BGSAVE for replication failed"); listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { slave->replstate = REPL_STATE_NONE; slave->flags &= ~CLIENT_SLAVE; listDelNode(server.slaves,ln); addReplyError(slave, "BGSAVE failed, replication can't continue"); slave->flags |= CLIENT_CLOSE_AFTER_REPLY; } } return retval; } /* If the target is socket, rdbSaveToSlavesSockets() already setup * the slaves for a full resync. Otherwise for disk target do it now.*/ if (!socket_target) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); } } } /* Flush the script cache, since we need that slave differences are * accumulated without requiring slaves to match our cached scripts. */ if (retval == C_OK) replicationScriptCacheFlush(); return retval; } /* SYNC and PSYNC command implementation. */ void syncCommand(client *c) { /* ignore SYNC if already slave or in monitor mode */ if (c->flags & CLIENT_SLAVE) return; /* Check if this is a failover request to a replica with the same replid and * become a master if so. */ if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") && !strcasecmp(c->argv[3]->ptr,"failover")) { serverLog(LL_WARNING, "Failover request received for replid %s.", (unsigned char *)c->argv[1]->ptr); if (!server.masterhost) { addReplyError(c, "PSYNC FAILOVER can't be sent to a master."); return; } if (!strcasecmp(c->argv[1]->ptr,server.replid)) { replicationUnsetMaster(); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE, "MASTER MODE enabled (failover request from '%s')",client); sdsfree(client); } else { addReplyError(c, "PSYNC FAILOVER replid must match my replid."); return; } } /* Don't let replicas sync with us while we're failing over */ if (server.failover_state != NO_FAILOVER) { addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over"); return; } /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master"); return; } /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } serverLog(LL_NOTICE,"Replica %s asks for synchronization", replicationGetSlaveName(c)); /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however * when this happens replicationSetupSlaveForFullResync will replied * with: * * +FULLRESYNC * * So the slave knows the new replid and offset to try a PSYNC later * if the connection with the master is lost. */ if (!strcasecmp(c->argv[0]->ptr,"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { char *master_replid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * replid is not "?", as this is used by slaves to force a full * resync on purpose when they are not able to partially * resync. */ if (master_replid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation * of the replication protocol (like redis-cli --slave). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ c->flags |= CLIENT_PRE_PSYNC; } /* Full resynchronization. */ server.stat_sync_full++; /* Setup the slave as one waiting for BGSAVE to start. The following code * paths will change the state if we handle the slave differently. */ c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c); /* Create the replication backlog if needed. */ if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { /* When we create the backlog from scratch, we always use a new * replication ID and clear the ID2, since there is no valid * past history. */ changeReplicationId(); clearReplicationId2(); createReplicationBacklog(); serverLog(LL_NOTICE,"Replication backlog created, my new " "replication IDs are '%s' and '%s'", server.replid, server.replid2); } /* CASE 1: BGSAVE is in progress, with disk target. */ if (server.child_type == CHILD_TYPE_RDB && server.rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save. */ client *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; /* If the client needs a buffer of commands, we can't use * a replica without replication buffer. */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && (!(slave->flags & CLIENT_REPL_RDBONLY) || (c->flags & CLIENT_REPL_RDBONLY))) break; } /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE. */ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. * We don't copy buffer if clients don't want. */ if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences. */ serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); } /* CASE 2: BGSAVE is in progress, with socket target. */ } else if (server.child_type == CHILD_TYPE_RDB && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); /* CASE 3: There is no BGSAVE is in progress. */ } else { if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) && server.repl_diskless_sync_delay) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); } else { /* We don't have a BGSAVE in progress, let's start one. Diskless * or disk-based mode is determined by replica's capacity. */ if (!hasActiveChildProcess()) { startBgsaveForReplication(c->slave_capa); } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); } } } return; } /* REPLCONF