summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rdb.c1
-rw-r--r--src/replication.c59
-rw-r--r--src/server.h5
-rw-r--r--src/syncio.c13
4 files changed, 61 insertions, 17 deletions
diff --git a/src/rdb.c b/src/rdb.c
index 4c5fb21e7..324206561 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1572,6 +1572,7 @@ int rdbSaveToSlavesSockets(void) {
clientids[numfds] = slave->id;
fds[numfds++] = slave->fd;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(slave,getPsyncInitialOffset());
/* Put the socket in non-blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket
* will share the O_NONBLOCK attribute with the parent). */
diff --git a/src/replication.c b/src/replication.c
index 202342793..72f29a072 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -349,6 +349,41 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
return server.repl_backlog_histlen - skip;
}
+/* Return the offset to provide as reply to the PSYNC command received
+ * from the slave. The returned value is only valid immediately after
+ * the BGSAVE process started and before executing any other command
+ * from clients. */
+long long getPsyncInitialOffset(void) {
+ long long psync_offset = server.master_repl_offset;
+ /* Add 1 to psync_offset if it the replication backlog does not exists
+ * as when it will be created later we'll increment the offset by one. */
+ if (server.repl_backlog == NULL) psync_offset++;
+ return psync_offset;
+}
+
+/* Send a PSYNC reply in the specific case of a full resynchronization.
+ * As a side effect, set into the slave client structure the offset
+ * we sent here, so that if new slaves will later attach to the same
+ * background RDB saving process (by duplicating this client output
+ * buffer), we can get the right offset from this slave. */
+int replicationSendFullresyncReply(client *slave, long long offset) {
+ char buf[128];
+ int buflen;
+
+ slave->psync_initial_offset = offset;
+ /* Don't send this reply to slaves that approached us with
+ * the old SYNC command. */
+ if (!(slave->flags & CLIENT_PRE_PSYNC)) {
+ buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
+ server.runid,offset);
+ if (write(slave->fd,buf,buflen) != buflen) {
+ freeClientAsync(slave);
+ return C_ERR;
+ }
+ }
+ return C_OK;
+}
+
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
@@ -422,18 +457,10 @@ int masterTryPartialResynchronization(client *c) {
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
- /* We need a full resync for some reason... notify the client. */
- psync_offset = server.master_repl_offset;
- /* Add 1 to psync_offset if it the replication backlog does not exists
- * as when it will be created later we'll increment the offset by one. */
- if (server.repl_backlog == NULL) psync_offset++;
- /* Again, we can't use the connection buffers (see above). */
- buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
- server.runid,psync_offset);
- if (write(c->fd,buf,buflen) != buflen) {
- freeClientAsync(c);
- return C_OK;
- }
+ /* We need a full resync for some reason... Note that we can't
+ * reply to PSYNC right now if a full SYNC is needed. The reply
+ * must include the master offset at the time the RDB file we transfer
+ * is generated, so we need to delay the reply to that moment. */
return C_ERR;
}
@@ -537,6 +564,7 @@ void syncCommand(client *c) {
* another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);
c->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
@@ -568,6 +596,7 @@ void syncCommand(client *c) {
return;
}
c->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(c,getPsyncInitialOffset());
}
}
@@ -755,6 +784,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(slave,getPsyncInitialOffset());
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf;
@@ -2117,8 +2147,11 @@ void replicationCron(void) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START)
+ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
+ replicationSendFullresyncReply(slave,
+ getPsyncInitialOffset());
+ }
}
}
}
diff --git a/src/server.h b/src/server.h
index c3bea54ea..bbd0014e1 100644
--- a/src/server.h
+++ b/src/server.h
@@ -564,6 +564,9 @@ typedef struct client {
long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */
+ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
+ copying this slave output buffer
+ should use. */
char replrunid[CONFIG_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */
@@ -1198,6 +1201,8 @@ int replicationCountAcksByOffset(long long offset);
void replicationSendNewlineToMaster(void);
long long replicationGetSlaveOffset(void);
char *replicationGetSlaveName(client *c);
+long long getPsyncInitialOffset(void);
+int replicationSendFullresyncReply(client *slave, long long offset);
/* Generic persistence functions */
void startLoading(FILE *fp);
diff --git a/src/syncio.c b/src/syncio.c
index b2843d5fb..48e0a0b79 100644
--- a/src/syncio.c
+++ b/src/syncio.c
@@ -118,7 +118,9 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) {
}
/* Read a line making sure that every char will not require more than 'timeout'
- * milliseconds to be read.
+ * milliseconds to be read. Empty newlines before the first non-empty line
+ * are ignored. This is useful because since Redis sometimes uses empty
+ * newlines in order to take the connection "alive".
*
* On success the number of bytes read is returned, otherwise -1.
* On success the string is always correctly terminated with a 0 byte. */
@@ -131,9 +133,12 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
if (syncRead(fd,&c,1,timeout) == -1) return -1;
if (c == '\n') {
- *ptr = '\0';
- if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
- return nread;
+ /* Ignore empty lines, otherwise return to the caller. */
+ if (nread != 0) {
+ *ptr = '\0';
+ if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
+ return nread;
+ }
} else {
*ptr++ = c;
*ptr = '\0';