summaryrefslogtreecommitdiff
path: root/src/replication.c
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2015-07-21 11:55:17 +0300
committerOran Agra <oran@redislabs.com>2015-07-21 11:55:17 +0300
commiteb706b42023d22fcc06e4c79973d0c53c09de7ba (patch)
tree1e6fdf512b63ecae3a1f2f9d3bf3cf8fba961a53 /src/replication.c
parent9e67df2a39a37386cb38701f287857fd95c31527 (diff)
downloadredis-eb706b42023d22fcc06e4c79973d0c53c09de7ba.tar.gz
introduce REPLCONF eof-supported and repl-diskless-load
Diffstat (limited to 'src/replication.c')
-rw-r--r--src/replication.c68
1 files changed, 47 insertions, 21 deletions
diff --git a/src/replication.c b/src/replication.c
index 7cdfdb165..90ac9c4c8 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -442,13 +442,13 @@ need_full_resync:
* the script cache is flushed before to start.
*
* Returns REDIS_OK on success or REDIS_ERR otherwise. */
-int startBgsaveForReplication(void) {
+int startBgsaveForReplication(int use_eof) {
int retval;
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s",
- server.repl_diskless_sync ? "slaves sockets" : "disk");
+ use_eof ? "slaves sockets" : "disk");
- if (server.repl_diskless_sync)
+ if (use_eof)
retval = rdbSaveToSlavesSockets();
else
retval = rdbSaveBackground(server.rdb_filename);
@@ -553,7 +553,7 @@ void syncCommand(redisClient *c) {
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
} else {
- if (server.repl_diskless_sync) {
+ if (server.repl_diskless_sync && c->repl_eof_supported) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
@@ -562,7 +562,7 @@ void syncCommand(redisClient *c) {
redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC");
} else {
/* Ok we don't have a BGSAVE in progress, let's start one. */
- if (startBgsaveForReplication() != REDIS_OK) {
+ if (startBgsaveForReplication(0) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
@@ -613,6 +613,8 @@ void replconfCommand(redisClient *c) {
&port,NULL) != REDIS_OK))
return;
c->slave_listening_port = port;
+ } else if (!strcasecmp(c->argv[j]->ptr,"eof-supported")) {
+ c->repl_eof_supported = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
@@ -745,7 +747,8 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
* (if it had a disk or socket target). */
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
- int startbgsave = 0;
+ int slaves_waiting_eof = 0;
+ int slaves_waiting_noneof = 0;
listIter li;
listRewind(server.slaves,&li);
@@ -753,7 +756,10 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
- startbgsave = 1;
+ if (slave->repl_eof_supported)
+ slaves_waiting_eof++;
+ else
+ slaves_waiting_noneof++;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf;
@@ -801,8 +807,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
}
}
}
- if (startbgsave) {
- if (startBgsaveForReplication() != REDIS_OK) {
+ if (slaves_waiting_eof || slaves_waiting_noneof) {
+ /* if there is at least one slave that doesn't support EOF, we'll start an non-eof replication */
+ if (startBgsaveForReplication(slaves_waiting_noneof==0) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
@@ -825,7 +832,7 @@ void replicationAbortSyncTransfer(void) {
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
close(server.repl_transfer_s);
- if (!server.repl_diskless_sync) {
+ if (server.repl_transfer_fd!=-1) {
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
@@ -938,18 +945,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* at the next call. */
server.repl_transfer_size = 0;
redisLog(REDIS_NOTICE,
- "MASTER <-> SLAVE sync: receiving streamed RDB from master");
+ "MASTER <-> SLAVE sync: receiving streamed RDB from master with EOF %s",
+ server.repl_diskless_load? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
- "MASTER <-> SLAVE sync: receiving %lld bytes from master",
- (long long) server.repl_transfer_size);
+ "MASTER <-> SLAVE sync: receiving %lld bytes from master %s",
+ (long long) server.repl_transfer_size,
+ server.repl_diskless_load? "to parser":"to disk");
}
return;
}
- if (!server.repl_diskless_sync) {
+ if (!server.repl_diskless_load) {
/* read the data from the socket, store it to a file and search for the EOF */
if (usemark) {
readlen = sizeof(buf);
@@ -1033,7 +1042,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
- if (server.repl_diskless_sync) {
+ if (server.repl_diskless_load) {
rio rdb;
rioInitWithFd(&rdb,fd);
/* Put the socket in blocking mode to simplify RDB transfer.
@@ -1370,6 +1379,18 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
sdsfree(err);
}
+
+ /* Inform the master that this slave supports EOF marker of diskless-sync */
+ {
+ err = sendSynchronousCommand(fd,"REPLCONF","eof-supported","yes",
+ NULL);
+ /* Ignore the error if any, not all the Redis versions support
+ * REPLCONF eof-supported. */
+ if (err[0] == '-') {
+ redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF eof-supported: %s", err);
+ }
+ sdsfree(err);
+ }
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
@@ -1395,7 +1416,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* Prepare a suitable temp file for bulk transfer */
- if (!server.repl_diskless_sync) {
+ if (!server.repl_diskless_load) {
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
@@ -2133,7 +2154,8 @@ void replicationCron(void) {
* slaves in WAIT_BGSAVE_START state. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
time_t idle, max_idle = 0;
- int slaves_waiting = 0;
+ int slaves_waiting_eof = 0;
+ int slaves_waiting_noneof = 0;
listNode *ln;
listIter li;
@@ -2143,14 +2165,18 @@ void replicationCron(void) {
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
- slaves_waiting++;
+ if (slave->repl_eof_supported)
+ slaves_waiting_eof++;
+ else
+ slaves_waiting_noneof++;
}
}
- if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
+ if ((slaves_waiting_eof || slaves_waiting_noneof) && max_idle > server.repl_diskless_sync_delay) {
/* Start a BGSAVE. Usually with socket target, or with disk target
- * if there was a recent socket -> disk config change. */
- if (startBgsaveForReplication() == REDIS_OK) {
+ * if there was a recent socket -> disk config change.
+ * if there is at least one slave that doesn't support EOF, we'll start an non-eof replication */
+ if (startBgsaveForReplication(slaves_waiting_noneof==0) == REDIS_OK) {
/* It started! We need to change the state of slaves
* from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case
* the current target is disk. Otherwise it was already done