summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf4
-rw-r--r--src/aof.c4
-rw-r--r--src/db.c2
-rw-r--r--src/debug.c4
-rw-r--r--src/networking.c26
-rw-r--r--src/rdb.c49
-rw-r--r--src/rdb.h10
-rw-r--r--src/replication.c418
-rw-r--r--src/server.c19
-rw-r--r--src/server.h47
10 files changed, 440 insertions, 143 deletions
diff --git a/redis.conf b/redis.conf
index a7b7f3a97..bce5332e0 100644
--- a/redis.conf
+++ b/redis.conf
@@ -402,6 +402,10 @@ repl-disable-tcp-nodelay no
# need to elapse, starting from the time the last slave disconnected, for
# the backlog buffer to be freed.
#
+# Note that slaves never free the backlog for timeout, since they may be
+# promoted to masters later, and should be able to correctly "partially
+# resynchronize" with the slaves: hence they should always accumulate backlog.
+#
# A value of 0 means to never release the backlog.
#
# repl-backlog-ttl 3600
diff --git a/src/aof.c b/src/aof.c
index c75153cc7..07d8561da 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -653,7 +653,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
- if (rdbLoadRio(&rdb) != C_OK) {
+ if (rdbLoadRio(&rdb,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
@@ -1152,7 +1152,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_use_rdb_preamble) {
int error;
- if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
+ if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
diff --git a/src/db.c b/src/db.c
index 268e7c384..55ae663c2 100644
--- a/src/db.c
+++ b/src/db.c
@@ -413,7 +413,7 @@ void flushallCommand(client *c) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
- rdbSave(server.rdb_filename);
+ rdbSave(server.rdb_filename,NULL);
server.dirty = saved_dirty;
}
server.dirty++;
diff --git a/src/debug.c b/src/debug.c
index d48caedcc..f4689d532 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -320,12 +320,12 @@ void debugCommand(client *c) {
if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
serverAssertWithInfo(c,c->argv[0],1 == 2);
} else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
- if (rdbSave(server.rdb_filename) != C_OK) {
+ if (rdbSave(server.rdb_filename,NULL) != C_OK) {
addReply(c,shared.err);
return;
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
- if (rdbLoad(server.rdb_filename) != C_OK) {
+ if (rdbLoad(server.rdb_filename,NULL) != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
return;
}
diff --git a/src/networking.c b/src/networking.c
index 2be40ae15..b2cec8631 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -352,6 +352,14 @@ void addReplySds(client *c, sds s) {
}
}
+/* This low level function just adds whatever protocol you send it to the
+ * client buffer, trying the static buffer initially, and using the string
+ * of objects if not possible.
+ *
+ * It is efficient because does not create an SDS object nor an Redis object
+ * if not needed. The object will only be created by calling
+ * _addReplyStringToList() if we fail to extend the existing tail object
+ * in the list of objects. */
void addReplyString(client *c, const char *s, size_t len) {
if (prepareClientToWrite(c) != C_OK) return;
if (_addReplyToBuffer(c,s,len) != C_OK)
@@ -1022,7 +1030,7 @@ int processInlineBuffer(client *c) {
char *newline;
int argc, j;
sds *argv, aux;
- size_t querylen;
+ size_t querylen, protolen;
/* Search for end of line */
newline = strchr(c->querybuf,'\n');
@@ -1035,6 +1043,7 @@ int processInlineBuffer(client *c) {
}
return C_ERR;
}
+ protolen = (newline - c->querybuf)+1; /* Total protocol bytes of command. */
/* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r')
@@ -1057,6 +1066,15 @@ int processInlineBuffer(client *c) {
if (querylen == 0 && c->flags & CLIENT_SLAVE)
c->repl_ack_time = server.unixtime;
+ /* Newline from masters can be used to prevent timeouts, but should
+ * not affect the replication offset since they are always sent
+ * "out of band" directly writing to the socket and without passing
+ * from the output buffers. */
+ if (querylen == 0 && c->flags & CLIENT_MASTER) {
+ c->reploff -= protolen;
+ while (protolen--) chopReplicationBacklog();
+ }
+
/* Leave data after the first line of the query in the buffer */
sdsrange(c->querybuf,querylen+2,-1);
@@ -1321,7 +1339,11 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
- if (c->flags & CLIENT_MASTER) c->reploff += nread;
+ if (c->flags & CLIENT_MASTER) {
+ c->reploff += nread;
+ replicationFeedSlavesFromMasterStream(server.slaves,
+ c->querybuf+qblen,nread);
+ }
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
diff --git a/src/rdb.c b/src/rdb.c
index 29f880dac..aa9c631de 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -835,7 +835,7 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
-int rdbSaveInfoAuxFields(rio *rdb, int flags) {
+int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
@@ -844,6 +844,16 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
+
+ /* Handle saving options that generate aux fields. */
+ if (rsi) {
+ if (rsi->repl_stream_db &&
+ rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
+ == -1)
+ {
+ return -1;
+ }
+ }
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
return 1;
}
@@ -856,7 +866,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
-int rdbSaveRio(rio *rdb, int *error, int flags) {
+int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
@@ -869,7 +879,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
- if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
+ if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@@ -945,7 +955,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
-int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
+int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
@@ -953,7 +963,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
- if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
+ if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;
@@ -964,7 +974,7 @@ werr: /* Write error. */
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
-int rdbSave(char *filename) {
+int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
@@ -985,7 +995,7 @@ int rdbSave(char *filename) {
}
rioInitWithFile(&rdb,fp);
- if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
+ if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@@ -1023,7 +1033,7 @@ werr:
return C_ERR;
}
-int rdbSaveBackground(char *filename) {
+int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
@@ -1040,7 +1050,7 @@ int rdbSaveBackground(char *filename) {
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
- retval = rdbSave(filename);
+ retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
@@ -1410,7 +1420,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
-int rdbLoadRio(rio *rdb) {
+int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
@@ -1501,6 +1511,8 @@ int rdbLoadRio(rio *rdb) {
serverLog(LL_NOTICE,"RDB '%s': %s",
(char*)auxkey->ptr,
(char*)auxval->ptr);
+ } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
+ if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
@@ -1559,8 +1571,11 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
* filename is open for reading and a rio stream object created in order
* to do the actual loading. Moreover the ETA displayed in the INFO
- * output is initialized and finalized. */
-int rdbLoad(char *filename) {
+ * output is initialized and finalized.
+ *
+ * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
+ * loading code will fiil the information fields in the structure. */
+int rdbLoad(char *filename, rdbSaveInfo *rsi) {
FILE *fp;
rio rdb;
int retval;
@@ -1568,7 +1583,7 @@ int rdbLoad(char *filename) {
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
rioInitWithFile(&rdb,fp);
- retval = rdbLoadRio(&rdb);
+ retval = rdbLoadRio(&rdb,rsi);
fclose(fp);
stopLoading();
return retval;
@@ -1721,7 +1736,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
-int rdbSaveToSlavesSockets(void) {
+int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
int *fds;
uint64_t *clientids;
int numfds;
@@ -1779,7 +1794,7 @@ int rdbSaveToSlavesSockets(void) {
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-to-slaves");
- retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
+ retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
if (retval == C_OK && rioFlush(&slave_sockets) == 0)
retval = C_ERR;
@@ -1884,7 +1899,7 @@ void saveCommand(client *c) {
addReplyError(c,"Background save already in progress");
return;
}
- if (rdbSave(server.rdb_filename) == C_OK) {
+ if (rdbSave(server.rdb_filename,NULL) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
@@ -1918,7 +1933,7 @@ void bgsaveCommand(client *c) {
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenver "
"possible.");
}
- } else if (rdbSaveBackground(server.rdb_filename) == C_OK) {
+ } else if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
diff --git a/src/rdb.h b/src/rdb.h
index 60c52a7c1..efe932255 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -118,11 +118,11 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
-int rdbLoad(char *filename);
-int rdbSaveBackground(char *filename);
-int rdbSaveToSlavesSockets(void);
+int rdbLoad(char *filename, rdbSaveInfo *rsi);
+int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
+int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
-int rdbSave(char *filename);
+int rdbSave(char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o);
size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb);
@@ -136,6 +136,6 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
-int rdbLoadRio(rio *rdb);
+int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi);
#endif
diff --git a/src/replication.c b/src/replication.c
index 67091dd0b..a98d0d35e 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -39,6 +39,7 @@
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd);
+void replicationCacheMasterUsingMyself(void);
void replicationSendAck(void);
void putSlaveOnline(client *slave);
int cancelReplicationHandshake(void);
@@ -79,11 +80,6 @@ void createReplicationBacklog(void) {
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
- /* When a new backlog buffer is created, we increment the replication
- * offset by one to make sure we'll not be able to PSYNC with any
- * previous slave. This is needed because we avoid incrementing the
- * master_repl_offset if no backlog exists nor slaves are attached. */
- server.master_repl_offset++;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
@@ -153,6 +149,22 @@ void feedReplicationBacklog(void *ptr, size_t len) {
server.repl_backlog_histlen + 1;
}
+/* Remove the last byte from the replication backlog. This
+ * is useful when we receive an out of band "\n" to keep the connection
+ * alive but don't want to count it as replication stream.
+ *
+ * As a side effect this function adjusts the master replication offset
+ * of this instance to account for the missing byte. */
+void chopReplicationBacklog(void) {
+ if (!server.repl_backlog || !server.repl_backlog_histlen) return;
+ if (server.repl_backlog_idx == 0)
+ server.repl_backlog_idx = server.repl_backlog_size-1;
+ else
+ server.repl_backlog_idx--;
+ server.master_repl_offset--;
+ server.repl_backlog_histlen--;
+}
+
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
* as input. */
void feedReplicationBacklogWithObject(robj *o) {
@@ -170,12 +182,24 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len);
}
+/* Propagate write commands to slaves, and populate the replication backlog
+ * as well. This function is used if the instance is a master: we use
+ * the commands received by our clients in order to create the replication
+ * stream. Instead if the instance is a slave and has sub-slaves attached,
+ * we use replicationFeedSlavesFromMaster() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
+ /* If the instance is not a top level master, return ASAP: we'll just proxy
+ * the stream of data we receive from our master instead, in order to
+ * propagate *identical* replication stream. In this way this slave can
+ * advertise the same replication ID as the master (since it shares the
+ * master replication history and has the same backlog and offsets). */
+ if (server.masterhost != NULL) return;
+
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
@@ -265,6 +289,32 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}
}
+/* This function is used in order to proxy what we receive from our master
+ * to our sub-slaves. */
+#include <ctype.h>
+void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
+ listNode *ln;
+ listIter li;
+
+ {
+ printf("%zu:",buflen);
+ for (size_t j = 0; j < buflen; j++) {
+ printf("%c", isprint(buf[j]) ? buf[j] : '.');
+ }
+ printf("\n");
+ }
+
+ if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
+ listRewind(slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+
+ /* Don't feed slaves that are still waiting for BGSAVE to start */
+ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
+ addReplyString(slave,buf,buflen);
+ }
+}
+
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
@@ -329,7 +379,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
skip = offset - server.repl_backlog_off;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
- /* Point j to the oldest byte, that is actaully our
+ /* Point j to the oldest byte, that is actually our
* server.repl_backlog_off byte. */
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
@@ -361,18 +411,14 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
* the BGSAVE process started and before executing any other command
* from clients. */
long long getPsyncInitialOffset(void) {
- long long psync_offset = server.master_repl_offset;
- /* Add 1 to psync_offset if it the replication backlog does not exists
- * as when it will be created later we'll increment the offset by one. */
- if (server.repl_backlog == NULL) psync_offset++;
- return psync_offset;
+ return server.master_repl_offset;
}
/* Send a FULLRESYNC reply in the specific case of a full resynchronization,
* as a side effect setup the slave for a full sync in different ways:
*
- * 1) Remember, into the slave client structure, the offset we sent
- * here, so that if new slaves will later attach to the same
+ * 1) Remember, into the slave client structure, the replication offset
+ * we sent here, so that if new slaves will later attach to the same
* background RDB saving process (by duplicating this client output
* buffer), we can get the right offset from this slave.
* 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
@@ -392,14 +438,14 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
- * a SLEECT statement in the replication stream. */
+ * a SELECT statement in the replication stream. */
server.slaveseldb = -1;
/* Don't send this reply to slaves that approached us with
* the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
- server.runid,offset);
+ server.replid,offset);
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
@@ -415,19 +461,32 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
* with the usual full resync. */
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
- char *master_runid = c->argv[1]->ptr;
+ char *master_replid = c->argv[1]->ptr;
char buf[128];
int buflen;
- /* Is the runid of this master the same advertised by the wannabe slave
- * via PSYNC? If runid changed this master is a different instance and
- * there is no way to continue. */
- if (strcasecmp(master_runid, server.runid)) {
+ /* Parse the replication offset asked by the slave. Go to full sync
+ * on parse error: this should never happen but we try to handle
+ * it in a robust way compared to aborting. */
+ if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
+ C_OK) goto need_full_resync;
+
+ /* Is the replication ID of this master the same advertised by the wannabe
+ * slave via PSYNC? If the replication ID changed this master has a
+ * different replication history, and there is no way to continue.
+ *
+ * Note that there are two potentially valid replication IDs: the ID1
+ * and the ID2. The ID2 however is only valid up to a specific offset. */
+ if (strcasecmp(master_replid, server.replid) &&
+ (strcasecmp(master_replid, server.replid2) ||
+ psync_offset > server.second_replid_offset))
+ {
/* Run id "?" is used by slaves that want to force a full resync. */
- if (master_runid[0] != '?') {
+ if (master_replid[0] != '?') {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
- "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
- master_runid, server.runid);
+ "Replication ID mismatch (Slave asked for '%s', my replication "
+ "ID is '%s')",
+ master_replid, server.replid);
} else {
serverLog(LL_NOTICE,"Full resync requested by slave %s",
replicationGetSlaveName(c));
@@ -436,8 +495,6 @@ int masterTryPartialResynchronization(client *c) {
}
/* We still have the data our slave is asking for? */
- if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
- C_OK) goto need_full_resync;
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
@@ -463,7 +520,11 @@ int masterTryPartialResynchronization(client *c) {
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* empty so this write will never fail actually. */
- buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
+ if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
+ buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
+ } else {
+ buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
+ }
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
@@ -515,10 +576,18 @@ int startBgsaveForReplication(int mincapa) {
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "slaves sockets" : "disk");
+ rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
+ /* If we are saving for a chained slave (that is, if we are,
+ * in turn, a slave of another instance), make sure after
+ * loadig the RDB, our slaves select the right DB: we'll just
+ * send the replication stream we receive from our master, so
+ * no way to send SELECT commands. */
+ if (server.master) rsi.repl_stream_db = server.master->db->id;
+
if (socket_target)
- retval = rdbSaveToSlavesSockets();
+ retval = rdbSaveToSlavesSockets(&rsi);
else
- retval = rdbSaveBackground(server.rdb_filename);
+ retval = rdbSaveBackground(server.rdb_filename,&rsi);
/* If we failed to BGSAVE, remove the slaves waiting for a full
* resynchorinization from the list of salves, inform them with
@@ -589,22 +658,22 @@ void syncCommand(client *c) {
* when this happens masterTryPartialResynchronization() already
* replied with:
*
- * +FULLRESYNC <runid> <offset>
+ * +FULLRESYNC <replid> <offset>
*
- * So the slave knows the new runid and offset to try a PSYNC later
+ * So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
- char *master_runid = c->argv[1]->ptr;
+ char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
- * runid is not "?", as this is used by slaves to force a full
+ * replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
- if (master_runid[0] != '?') server.stat_sync_partial_err++;
+ if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
@@ -625,6 +694,16 @@ void syncCommand(client *c) {
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);
+ /* Create the replication backlog if needed. */
+ if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
+ /* When we create the backlog from scratch, we always use a new
+ * replication ID and clear the ID2, since there is no valid
+ * past history. */
+ changeReplicationId();
+ clearReplicationId2();
+ createReplicationBacklog();
+ }
+
/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
@@ -685,9 +764,6 @@ void syncCommand(client *c) {
}
}
}
-
- if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
- createReplicationBacklog();
return;
}
@@ -735,6 +811,8 @@ void replconfCommand(client *c) {
/* Ignore capabilities not understood by this master. */
if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
c->slave_capa |= SLAVE_CAPA_EOF;
+ else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
+ c->slave_capa |= SLAVE_CAPA_PSYNC2;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
@@ -928,6 +1006,43 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
if (startbgsave) startBgsaveForReplication(mincapa);
}
+/* Change the current instance replication ID with a new, random one.
+ * This will prevent successful PSYNCs between this master and other
+ * slaves, so the command should be called when something happens that
+ * alters the current story of the dataset. */
+void changeReplicationId(void) {
+ getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
+ server.replid[CONFIG_RUN_ID_SIZE] = '\0';
+}
+
+/* Clear (invalidate) the secondary replication ID. This happens, for
+ * example, after a full resynchronization, when we start a new replication
+ * history. */
+void clearReplicationId2(void) {
+ memset(server.replid2,'0',sizeof(server.replid));
+ server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
+ server.second_replid_offset = -1;
+}
+
+/* Use the current replication ID / offset as secondary replication
+ * ID, and change the current one in order to start a new history.
+ * This should be used when an instance is switched from slave to master
+ * so that it can serve PSYNC requests performed using the master
+ * replication ID. */
+void shiftReplicationId(void) {
+ memcpy(server.replid2,server.replid,sizeof(server.replid));
+ /* We set the second replid offset to the master offset + 1, since
+ * the slave will ask for the first byte it has not yet received, so
+ * we need to add one to the offset: for example if, as a slave, we are
+ * sure we have the same history as the master for 50 bytes, after we
+ * are turned into a master, we can accept a PSYNC request with offset
+ * 51, since the slave asking has the same history up to the 50th
+ * byte, and is asking for the new bytes starting at offset 51. */
+ server.second_replid_offset = server.master_repl_offset+1;
+ changeReplicationId();
+ serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
+}
+
/* ----------------------------------- SLAVE -------------------------------- */
/* Returns 1 if the given replication state is a handshake state,
@@ -965,18 +1080,18 @@ void replicationEmptyDbCallback(void *privdata) {
/* Once we have a link with the master and the synchroniziation was
* performed, this function materializes the master client we store
* at server.master, starting from the specified file descriptor. */
-void replicationCreateMasterClient(int fd) {
+void replicationCreateMasterClient(int fd, int dbid) {
server.master = createClient(fd);
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
- server.repl_state = REPL_STATE_CONNECTED;
- server.master->reploff = server.repl_master_initial_offset;
- memcpy(server.master->replrunid, server.repl_master_runid,
- sizeof(server.repl_master_runid));
+ server.master->reploff = server.master_initial_offset;
+ memcpy(server.master->replid, server.master_replid,
+ sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
* PSYNC capable, so we flag it accordingly. */
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
+ if (dbid != -1) selectDb(server.master,dbid);
}
/* Asynchronously read the SYNC payload we receive from a master */
@@ -1137,7 +1252,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
- if (rdbLoad(server.rdb_filename) != C_OK) {
+ rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
+ if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
return;
@@ -1145,7 +1261,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
- replicationCreateMasterClient(server.repl_transfer_s);
+ replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
+ server.repl_state = REPL_STATE_CONNECTED;
+ /* After a full resynchroniziation we use the replication ID and
+ * offset of the master. The secondary ID / offset are cleared since
+ * we are starting a new history. */
+ memcpy(server.replid,server.master->replid,sizeof(server.replid));
+ server.master_repl_offset = server.master->reploff;
+ clearReplicationId2();
+ /* Let's create the replication backlog if needed. Slaves need to
+ * accumulate the backlog regardless of the fact they have sub-slaves
+ * or not, in order to behave correctly if they are promoted to
+ * masters after a failover. */
+ if (server.repl_backlog == NULL) createReplicationBacklog();
+
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
@@ -1270,7 +1399,7 @@ char *sendSynchronousCommand(int flags, int fd, ...) {
*
* 1) As a side effect of the function call the function removes the readable
* event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
- * 2) server.repl_master_initial_offset is set to the right value according
+ * 2) server.master_initial_offset is set to the right value according
* to the master reply. This will be used to populate the 'server.master'
* structure replication offset.
*/
@@ -1281,31 +1410,31 @@ char *sendSynchronousCommand(int flags, int fd, ...) {
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
int slaveTryPartialResynchronization(int fd, int read_reply) {
- char *psync_runid;
+ char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
- /* Initially set repl_master_initial_offset to -1 to mark the current
+ /* Initially set master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
- server.repl_master_initial_offset = -1;
+ server.master_initial_offset = -1;
if (server.cached_master) {
- psync_runid = server.cached_master->replrunid;
+ psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
- serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
+ serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
- psync_runid = "?";
+ psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
- reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
+ reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
@@ -1327,31 +1456,31 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
aeDeleteFileEvent(server.el,fd,AE_READABLE);
if (!strncmp(reply,"+FULLRESYNC",11)) {
- char *runid = NULL, *offset = NULL;
+ char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
- runid = strchr(reply,' ');
- if (runid) {
- runid++;
- offset = strchr(runid,' ');
+ replid = strchr(reply,' ');
+ if (replid) {
+ replid++;
+ offset = strchr(replid,' ');
if (offset) offset++;
}
- if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
+ if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
- * runid to make sure next PSYNCs will fail. */
- memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
+ * replid to make sure next PSYNCs will fail. */
+ memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
- memcpy(server.repl_master_runid, runid, offset-runid-1);
- server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
- server.repl_master_initial_offset = strtoll(offset,NULL,10);
+ memcpy(server.master_replid, replid, offset-replid-1);
+ server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
+ server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
- server.repl_master_runid,
- server.repl_master_initial_offset);
+ server.master_replid,
+ server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
@@ -1360,9 +1489,40 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
}
if (!strncmp(reply,"+CONTINUE",9)) {
- /* Partial resync was accepted, set the replication state accordingly */
+ /* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");
+
+ /* Check the new replication ID advertised by the master. If it
+ * changed, we need to set the new ID as primary ID, and set or
+ * secondary ID as the old master ID up to the current offset, so
+ * that our sub-slaves will be able to PSYNC with us after a
+ * disconnection. */
+ char *start = reply+10;
+ char *end = reply+9;
+ while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
+ if (end-start == CONFIG_RUN_ID_SIZE) {
+ char new[CONFIG_RUN_ID_SIZE+1];
+ memcpy(new,start,CONFIG_RUN_ID_SIZE);
+ new[CONFIG_RUN_ID_SIZE] = '\0';
+
+ if (strcmp(new,server.cached_master->replid)) {
+ /* Master ID changed. */
+ serverLog(LL_WARNING,"Master replication ID changed to %s",new);
+
+ /* Set the old ID as our ID2, up to the current offset+1. */
+ memcpy(server.replid2,server.cached_master->replid,
+ sizeof(server.replid2));
+ server.second_replid_offset = server.master_repl_offset+1;
+
+ /* Update the cached master ID and our own primary ID to the
+ * new one. */
+ memcpy(server.replid,new,sizeof(server.replid));
+ memcpy(server.cached_master->replid,new,sizeof(server.replid));
+ }
+ }
+
+ /* Setup the replication to continue. */
sdsfree(reply);
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
@@ -1386,6 +1546,8 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
return PSYNC_NOT_SUPPORTED;
}
+/* This handler fires when the non blocking connect was able to
+ * establish a connection with the master. */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err = NULL;
int dfd, maxtries = 5;
@@ -1402,7 +1564,8 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
- /* Check for errors in the socket. */
+ /* Check for errors in the socket: after a non blocking connect() we
+ * may find that the socket is in error state. */
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
@@ -1531,13 +1694,15 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_state = REPL_STATE_SEND_CAPA;
}
- /* Inform the master of our capabilities. While we currently send
- * just one capability, it is possible to chain new capabilities here
- * in the form of REPLCONF capa X capa Y capa Z ...
+ /* Inform the master of our (slave) capabilities.
+ *
+ * EOF: supports EOF-style RDB transfer for diskless replication.
+ * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
+ *
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
- "capa","eof",NULL);
+ "capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
@@ -1591,14 +1756,14 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* PSYNC failed or is not supported: we want our slaves to resync with us
- * as well, if we have any (chained replication case). The mater may
- * transfer us an entirely different data set and we have no way to
- * incrementally feed our slaves after that. */
+ * as well, if we have any sub-slaves. The mater may transfer us an
+ * entirely different data set and we have no way to incrementally feed
+ * our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
- * and the server.repl_master_runid and repl_master_initial_offset are
+ * and the server.master_replid and master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
serverLog(LL_NOTICE,"Retrying with SYNC...");
@@ -1727,15 +1892,23 @@ int cancelReplicationHandshake(void) {
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
+ int was_master = server.masterhost == NULL;
+
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
- if (server.master) freeClient(server.master);
+ if (server.master) {
+ freeClient(server.master);
+ }
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
- disconnectSlaves(); /* Force our slaves to resync with us as well. */
- replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
- freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
+
+ /* Force our slaves to resync with us as well. They may hopefully be able
+ * to partially resync with us, but we can notify the replid change. */
+ disconnectSlaves();
cancelReplicationHandshake();
+ /* Before destroying our master state, create a cached master using
+ * our own parameters, to later PSYNC with the new master. */
+ if (was_master) replicationCacheMasterUsingMyself();
server.repl_state = REPL_STATE_CONNECT;
server.master_repl_offset = 0;
server.repl_down_since = 0;
@@ -1746,20 +1919,26 @@ void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */
sdsfree(server.masterhost);
server.masterhost = NULL;
- if (server.master) {
- if (listLength(server.slaves) == 0) {
- /* If this instance is turned into a master and there are no
- * slaves, it inherits the replication offset from the master.
- * Under certain conditions this makes replicas comparable by
- * replication offset to understand what is the most updated. */
- server.master_repl_offset = server.master->reploff;
- freeReplicationBacklog();
- }
- freeClient(server.master);
- }
+ /* When a slave is turned into a master, the current replication ID
+ * (that was inherited from the master at synchronization time) is
+ * used as secondary ID up to the current offset, and a new replication
+ * ID is created to continue with a new replication history. */
+ shiftReplicationId();
+ if (server.master) freeClient(server.master);
replicationDiscardCachedMaster();
cancelReplicationHandshake();
+ /* Disconnecting all the slaves is required: we need to inform slaves
+ * of the replication ID change (see shiftReplicationId() call). However
+ * the slaves will be able to partially resync with us, so it will be
+ * a very fast reconnection. */
+ disconnectSlaves();
server.repl_state = REPL_STATE_NONE;
+
+ /* We need to make sure the new master will start the replication stream
+ * with a SELECT statement. This is forced after a full resync, but
+ * with PSYNC version 2, there is no need for full resync after a
+ * master switch. */
+ server.slaveseldb = -1;
}
/* This function is called when the slave lose the connection with the
@@ -1931,6 +2110,31 @@ void replicationCacheMaster(client *c) {
replicationHandleMasterDisconnection();
}
+/* This function is called when a master is turend into a slave, in order to
+ * create from scratch a cached master for the new client, that will allow
+ * to PSYNC with the slave that was promoted as the new master after a
+ * failover.
+ *
+ * Assuming this instance was previously the master instance of the new master,
+ * the new master will accept its replication ID, and potentiall also the
+ * current offset if no data was lost during the failover. So we use our
+ * current replication ID and offset in order to synthesize a cached master. */
+void replicationCacheMasterUsingMyself(void) {
+ /* The master client we create can be set to any DBID, because
+ * the new master will start its replication stream with SELECT. */
+ server.master_initial_offset = server.master_repl_offset;
+ replicationCreateMasterClient(-1,-1);
+
+ /* Use our own ID / offset. */
+ memcpy(server.master->replid, server.replid, sizeof(server.replid));
+
+ /* Set as cached master. */
+ unlinkClient(server.master);
+ server.cached_master = server.master;
+ server.master = NULL;
+ serverLog(LL_NOTICE,"Before turning into a slave, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer.");
+}
+
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */
void replicationDiscardCachedMaster(void) {
@@ -2290,7 +2494,9 @@ void replicationCron(void) {
robj *ping_argv[1];
/* First, send PING according to ping_slave_period. */
- if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
+ if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
+ listLength(server.slaves))
+ {
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
@@ -2299,20 +2505,32 @@ void replicationCron(void) {
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
+ *
+ * Also send the a newline to all the chained slaves we have, if we lost
+ * connection from our master, to keep the slaves aware that their
+ * master is online. This is needed since sub-slaves only receive proxied
+ * data from top-level masters, so there is no explicit pinging in order
+ * to avoid altering the replication offsets. This special out of band
+ * pings (newlines) can be sent, they will have no effect in the offset.
+ *
* The newline will be ignored by the slave but will refresh the
- * last-io timer preventing a timeout. In this case we ignore the
+ * last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
+ int is_presync =
+ (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
- server.rdb_child_type != RDB_CHILD_TYPE_SOCKET))
- {
+ server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
+ int is_subslave = server.masterhost && server.master == NULL &&
+ slave->replstate == SLAVE_STATE_ONLINE;
+
+ if (is_presync || is_subslave) {
if (write(slave->fd, "\n", 1) == -1) {
- /* Don't worry, it's just a ping. */
+ /* Don't worry about socket errors, it's just a ping. */
}
}
}
@@ -2337,10 +2555,14 @@ void replicationCron(void) {
}
}
- /* If we have no attached slaves and there is a replication backlog
- * using memory, free it after some (configured) time. */
+ /* If this is a master without attached slaves and there is a replication
+ * backlog active, in order to reclaim memory we can free it after some
+ * (configured) time. Note that this cannot be done for slaves: slaves
+ * without sub-slaves attached should still accumulate data into the
+ * backlog, in order to reply to PSYNC queries if they are turned into
+ * masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
- server.repl_backlog)
+ server.repl_backlog && server.masterhost == NULL)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
diff --git a/src/server.c b/src/server.c
index 7e9b962b3..b94490a33 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1079,7 +1079,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
- rdbSaveBackground(server.rdb_filename);
+ rdbSaveBackground(server.rdb_filename,NULL);
break;
}
}
@@ -1151,7 +1151,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
- if (rdbSaveBackground(server.rdb_filename) == C_OK)
+ if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
@@ -1309,10 +1309,11 @@ void initServerConfig(void) {
int j;
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
+ server.runid[CONFIG_RUN_ID_SIZE] = '\0';
+ changeReplicationId();
server.configfile = NULL;
server.executable = NULL;
server.hz = CONFIG_DEFAULT_HZ;
- server.runid[CONFIG_RUN_ID_SIZE] = '\0';
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = CONFIG_DEFAULT_SERVER_PORT;
server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
@@ -1409,7 +1410,7 @@ void initServerConfig(void) {
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
- server.repl_master_initial_offset = -1;
+ server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
@@ -2471,7 +2472,7 @@ int prepareForShutdown(int flags) {
if ((server.saveparamslen > 0 && !nosave) || save) {
serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
/* Snapshotting. Perform a SYNC SAVE and exit */
- if (rdbSave(server.rdb_filename) != C_OK) {
+ if (rdbSave(server.rdb_filename,NULL) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background
@@ -3135,12 +3136,18 @@ sds genRedisInfoString(char *section) {
}
}
info = sdscatprintf(info,
+ "master_replid:%s\r\n"
+ "master_replid2:%s\r\n"
"master_repl_offset:%lld\r\n"
+ "second_repl_offset:%lld\r\n"
"repl_backlog_active:%d\r\n"
"repl_backlog_size:%lld\r\n"
"repl_backlog_first_byte_offset:%lld\r\n"
"repl_backlog_histlen:%lld\r\n",
+ server.replid,
+ server.replid2,
server.master_repl_offset,
+ server.second_replid_offset,
server.repl_backlog != NULL,
server.repl_backlog_size,
server.repl_backlog_off,
@@ -3416,7 +3423,7 @@ void loadDataFromDisk(void) {
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
- if (rdbLoad(server.rdb_filename) == C_OK) {
+ if (rdbLoad(server.rdb_filename,NULL) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
} else if (errno != ENOENT) {
diff --git a/src/server.h b/src/server.h
index b5dbaf0a5..8aa1d6fcb 100644
--- a/src/server.h
+++ b/src/server.h
@@ -293,7 +293,8 @@ typedef long long mstime_t; /* millisecond time type. */
/* Slave capabilities. */
#define SLAVE_CAPA_NONE 0
-#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
+#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
+#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
@@ -679,8 +680,8 @@ typedef struct client {
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
- char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */
- int slave_listening_port; /* As configured with: REPLCONF listening-port */
+ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
+ int slave_listening_port; /* As configured with: SLAVECONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
@@ -803,6 +804,20 @@ struct redisMemOverhead {
} *db;
};
+/* This structure can be optionally passed to RDB save/load functions in
+ * order to implement additional functionalities, by storing and loading
+ * metadata to the RDB file.
+ *
+ * Currently the only use is to select a DB at load time, useful in
+ * replication in order to make sure that chained slaves (slaves of slaves)
+ * select the correct DB and are able to accept the stream coming from the
+ * top-level master. */
+typedef struct rdbSaveInfo {
+ int repl_stream_db; /* DB to select in server.master client. */
+} rdbSaveInfo;
+
+#define RDB_SAVE_INFO_INIT {-1}
+
/*-----------------------------------------------------------------------------
* Global server state
*----------------------------------------------------------------------------*/
@@ -988,15 +1003,19 @@ struct redisServer {
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
/* Replication (master) */
+ char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
+ char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
+ long long master_repl_offset; /* My current replication offset */
+ long long second_replid_offset; /* Accept offsets up to this for replid2. */
int slaveseldb; /* Last SELECTed DB in replication output */
- long long master_repl_offset; /* Global replication offset */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_histlen; /* Backlog actual data length */
- long long repl_backlog_idx; /* Backlog circular buffer current offset */
- long long repl_backlog_off; /* Replication offset of first byte in the
- backlog buffer. */
+ long long repl_backlog_idx; /* Backlog circular buffer current offset,
+ that is the next byte will'll write to.*/
+ long long repl_backlog_off; /* Replication "master offset" of first
+ byte in the replication backlog buffer.*/
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
@@ -1029,8 +1048,11 @@ struct redisServer {
int slave_priority; /* Reported in INFO and used by Sentinel. */
int slave_announce_port; /* Give the master this listening port. */
char *slave_announce_ip; /* Give the master this ip address. */
- char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC.*/
- long long repl_master_initial_offset; /* Master PSYNC offset. */
+ /* The following two fields is where we store master PSYNC replid/offset
+ * while the PSYNC is in progress. At the end we'll copy the fields into
+ * the server->master client structure. */
+ char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
+ long long master_initial_offset; /* Master PSYNC offset. */
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Replication script cache. */
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
@@ -1259,6 +1281,7 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
+void addReplyString(client *c, const char *s, size_t len);
void addReplyBulk(client *c, robj *obj);
void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
@@ -1393,6 +1416,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
+void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
@@ -1414,6 +1438,9 @@ long long replicationGetSlaveOffset(void);
char *replicationGetSlaveName(client *c);
long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *slave, long long offset);
+void changeReplicationId(void);
+void clearReplicationId2(void);
+void chopReplicationBacklog(void);
/* Generic persistence functions */
void startLoading(FILE *fp);
@@ -1422,7 +1449,7 @@ void stopLoading(void);
/* RDB persistence */
#include "rdb.h"
-int rdbSaveRio(rio *rdb, int *error, int flags);
+int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
/* AOF persistence */
void flushAppendOnlyFile(int force);