summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-08-04 16:56:00 +0200
committerantirez <antirez@gmail.com>2015-08-04 17:06:10 +0200
commit292fec058a32323d5aa52dddfa86be280e29fe65 (patch)
treef728ed2b4687d008f4e03ae908359922286eb5b5
parentd1ff328170a161fc002e47954e5dd0e0989d2ce9 (diff)
downloadredis-292fec058a32323d5aa52dddfa86be280e29fe65.tar.gz
PSYNC initial offset fix.
This commit attempts to fix a bug involving PSYNC and diskless replication (currently experimental) found by Yuval Inbar from Redis Labs and that was later found to have even more far reaching effects (the bug also exists when diskstore is off). The gist of the bug is that, a Redis master replies with +FULLRESYNC to a PSYNC attempt that fails and requires a full resynchronization. However, the baseline offset sent along with FULLRESYNC was always the current master replication offset. This is not ok, because there are many reasosn that may delay the RDB file creation. And... guess what, the master offset we communicate must be the one of the time the RDB was created. So for example: 1) When the BGSAVE for replication is delayed since there is one already but is not good for replication. 2) When the BGSAVE is not needed as we attach one currently ongoing. 3) When because of diskless replication the BGSAVE is delayed. In all the above cases the PSYNC reply is wrong and the slave may reconnect later claiming to need a wrong offset: this may cause data curruption later.
-rw-r--r--src/rdb.c1
-rw-r--r--src/replication.c59
-rw-r--r--src/server.h5
-rw-r--r--src/syncio.c13
4 files changed, 61 insertions, 17 deletions
diff --git a/src/rdb.c b/src/rdb.c
index 4c5fb21e7..324206561 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1572,6 +1572,7 @@ int rdbSaveToSlavesSockets(void) {
clientids[numfds] = slave->id;
fds[numfds++] = slave->fd;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(slave,getPsyncInitialOffset());
/* Put the socket in non-blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket
* will share the O_NONBLOCK attribute with the parent). */
diff --git a/src/replication.c b/src/replication.c
index 202342793..72f29a072 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -349,6 +349,41 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
return server.repl_backlog_histlen - skip;
}
+/* Return the offset to provide as reply to the PSYNC command received
+ * from the slave. The returned value is only valid immediately after
+ * the BGSAVE process started and before executing any other command
+ * from clients. */
+long long getPsyncInitialOffset(void) {
+ long long psync_offset = server.master_repl_offset;
+ /* Add 1 to psync_offset if it the replication backlog does not exists
+ * as when it will be created later we'll increment the offset by one. */
+ if (server.repl_backlog == NULL) psync_offset++;
+ return psync_offset;
+}
+
+/* Send a PSYNC reply in the specific case of a full resynchronization.
+ * As a side effect, set into the slave client structure the offset
+ * we sent here, so that if new slaves will later attach to the same
+ * background RDB saving process (by duplicating this client output
+ * buffer), we can get the right offset from this slave. */
+int replicationSendFullresyncReply(client *slave, long long offset) {
+ char buf[128];
+ int buflen;
+
+ slave->psync_initial_offset = offset;
+ /* Don't send this reply to slaves that approached us with
+ * the old SYNC command. */
+ if (!(slave->flags & CLIENT_PRE_PSYNC)) {
+ buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
+ server.runid,offset);
+ if (write(slave->fd,buf,buflen) != buflen) {
+ freeClientAsync(slave);
+ return C_ERR;
+ }
+ }
+ return C_OK;
+}
+
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
@@ -422,18 +457,10 @@ int masterTryPartialResynchronization(client *c) {
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
- /* We need a full resync for some reason... notify the client. */
- psync_offset = server.master_repl_offset;
- /* Add 1 to psync_offset if it the replication backlog does not exists
- * as when it will be created later we'll increment the offset by one. */
- if (server.repl_backlog == NULL) psync_offset++;
- /* Again, we can't use the connection buffers (see above). */
- buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
- server.runid,psync_offset);
- if (write(c->fd,buf,buflen) != buflen) {
- freeClientAsync(c);
- return C_OK;
- }
+ /* We need a full resync for some reason... Note that we can't
+ * reply to PSYNC right now if a full SYNC is needed. The reply
+ * must include the master offset at the time the RDB file we transfer
+ * is generated, so we need to delay the reply to that moment. */
return C_ERR;
}
@@ -537,6 +564,7 @@ void syncCommand(client *c) {
* another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);
c->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
@@ -568,6 +596,7 @@ void syncCommand(client *c) {
return;
}
c->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(c,getPsyncInitialOffset());
}
}
@@ -755,6 +784,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(slave,getPsyncInitialOffset());
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf;
@@ -2117,8 +2147,11 @@ void replicationCron(void) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START)
+ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(slave,
+ getPsyncInitialOffset());
+ }
}
}
}
diff --git a/src/server.h b/src/server.h
index c3bea54ea..bbd0014e1 100644
--- a/src/server.h
+++ b/src/server.h
@@ -564,6 +564,9 @@ typedef struct client {
long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */
+ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
+ copying this slave output buffer
+ should use. */
char replrunid[CONFIG_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */
@@ -1198,6 +1201,8 @@ int replicationCountAcksByOffset(long long offset);
void replicationSendNewlineToMaster(void);
long long replicationGetSlaveOffset(void);
char *replicationGetSlaveName(client *c);
+long long getPsyncInitialOffset(void);
+int replicationSendFullresyncReply(client *slave, long long offset);
/* Generic persistence functions */
void startLoading(FILE *fp);
diff --git a/src/syncio.c b/src/syncio.c
index b2843d5fb..48e0a0b79 100644
--- a/src/syncio.c
+++ b/src/syncio.c
@@ -118,7 +118,9 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) {
}
/* Read a line making sure that every char will not require more than 'timeout'
- * milliseconds to be read.
+ * milliseconds to be read. Empty newlines before the first non-empty line
+ * are ignored. This is useful because since Redis sometimes uses empty
+ * newlines in order to take the connection "alive".
*
* On success the number of bytes read is returned, otherwise -1.
* On success the string is always correctly terminated with a 0 byte. */
@@ -131,9 +133,12 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
if (syncRead(fd,&c,1,timeout) == -1) return -1;
if (c == '\n') {
- *ptr = '\0';
- if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
- return nread;
+ /* Ignore empty lines, otherwise return to the caller. */
+ if (nread != 0) {
+ *ptr = '\0';
+ if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
+ return nread;
+ }
} else {
*ptr++ = c;
*ptr = '\0';