summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2014-11-11 17:12:12 +0100
committerantirez <antirez@gmail.com>2014-11-11 17:12:12 +0100
commitbb7fea0d5ca7b3a53532338e8654e409014c1194 (patch)
treed64d67df2e9777be866741578164ff262f3cd0a1
parentf5c6ebbfe3620d16c1e83c2ccaf1cec5a312aaae (diff)
downloadredis-bb7fea0d5ca7b3a53532338e8654e409014c1194.tar.gz
Diskless SYNC: fix RDB EOF detection.
RDB EOF detection was relying on the final part of the RDB transfer to be a magic 40 bytes EOF marker. However as the slave is put online immediately, and because of sockets timeouts, the replication stream is actually contiguous with the RDB file. This means that to detect the EOF correctly we should either: 1) Scan all the stream searching for the mark. Sucks CPU-wise. 2) Start to send the replication stream only after an acknowledge. 3) Implement a proper chunked encoding. For now solution "2" was picked, so the master does not start to send ASAP the stream of commands in the case of diskless replication. We wait for the first REPLCONF ACK command from the slave, that certifies us that the slave correctly loaded the RDB file and is ready to get more data.
-rw-r--r--src/networking.c1
-rw-r--r--src/redis.h1
-rw-r--r--src/replication.c23
3 files changed, 21 insertions, 4 deletions
diff --git a/src/networking.c b/src/networking.c
index cc9bbd98c..f10a1c5e2 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -100,6 +100,7 @@ redisClient *createClient(int fd) {
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
+ c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
diff --git a/src/redis.h b/src/redis.h
index f5301ab26..855ae5742 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -532,6 +532,7 @@ typedef struct redisClient {
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
+ int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd; /* replication DB file descriptor */
off_t repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
diff --git a/src/replication.c b/src/replication.c
index 8e97a330a..c0e833263 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -40,6 +40,7 @@
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd);
void replicationSendAck(void);
+void putSlaveOnline(redisClient *slave);
/* --------------------------- Utility functions ---------------------------- */
@@ -398,6 +399,7 @@ int masterTryPartialResynchronization(redisClient *c) {
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
+ c->repl_put_online_on_ack = 0;
listAddNodeTail(server.slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
@@ -623,6 +625,11 @@ void replconfCommand(redisClient *c) {
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
c->repl_ack_time = server.unixtime;
+ /* If this was a diskless replication, we need to really put
+ * the slave online when the first ACK is received (which
+ * confirms slave is online and ready to get more data). */
+ if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE)
+ putSlaveOnline(c);
/* Note: this command does not reply anything! */
return;
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
@@ -652,6 +659,7 @@ void replconfCommand(redisClient *c) {
* 3) Update the count of good slaves. */
void putSlaveOnline(redisClient *slave) {
slave->replstate = REDIS_REPL_ONLINE;
+ slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime;
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
@@ -660,6 +668,8 @@ void putSlaveOnline(redisClient *slave) {
return;
}
refreshGoodSlavesCount();
+ redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded",
+ replicationGetSlaveName(slave));
}
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
@@ -713,7 +723,6 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
putSlaveOnline(slave);
- redisLog(REDIS_NOTICE,"Synchronization with slave succeeded (disk)");
}
}
@@ -752,10 +761,16 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
* diskless replication, our work is trivial, we can just put
* the slave online. */
if (type == REDIS_RDB_CHILD_TYPE_SOCKET) {
- putSlaveOnline(slave);
redisLog(REDIS_NOTICE,
- "Synchronization with slave %s succeeded (socket)",
+ "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
replicationGetSlaveName(slave));
+ /* Note: we wait for a REPLCONF ACK message from slave in
+ * order to really put it online (install the write handler
+ * so that the accumulated data can be transfered). However
+ * we change the replication state ASAP, since our slave
+ * is technically online now. */
+ slave->replstate = REDIS_REPL_ONLINE;
+ slave->repl_put_online_on_ack = 1;
} else {
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
@@ -929,7 +944,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
int eof_reached = 0;
if (usemark) {
- /* Update the last bytes array, and check if it matches our delimiter. */
+ /* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= REDIS_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
} else {