diff options
author | Oran Agra <oran@redislabs.com> | 2015-07-21 11:55:17 +0300 |
---|---|---|
committer | Oran Agra <oran@redislabs.com> | 2015-07-21 11:55:17 +0300 |
commit | eb706b42023d22fcc06e4c79973d0c53c09de7ba (patch) | |
tree | 1e6fdf512b63ecae3a1f2f9d3bf3cf8fba961a53 /src/replication.c | |
parent | 9e67df2a39a37386cb38701f287857fd95c31527 (diff) | |
download | redis-eb706b42023d22fcc06e4c79973d0c53c09de7ba.tar.gz |
introduce REPLCONF eof-supported and repl-diskless-load
Diffstat (limited to 'src/replication.c')
-rw-r--r-- | src/replication.c | 68 |
1 files changed, 47 insertions, 21 deletions
diff --git a/src/replication.c b/src/replication.c index 7cdfdb165..90ac9c4c8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -442,13 +442,13 @@ need_full_resync: * the script cache is flushed before to start. * * Returns REDIS_OK on success or REDIS_ERR otherwise. */ -int startBgsaveForReplication(void) { +int startBgsaveForReplication(int use_eof) { int retval; redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", - server.repl_diskless_sync ? "slaves sockets" : "disk"); + use_eof ? "slaves sockets" : "disk"); - if (server.repl_diskless_sync) + if (use_eof) retval = rdbSaveToSlavesSockets(); else retval = rdbSaveBackground(server.rdb_filename); @@ -553,7 +553,7 @@ void syncCommand(redisClient *c) { c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } else { - if (server.repl_diskless_sync) { + if (server.repl_diskless_sync && c->repl_eof_supported) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ @@ -562,7 +562,7 @@ void syncCommand(redisClient *c) { redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); } else { /* Ok we don't have a BGSAVE in progress, let's start one. */ - if (startBgsaveForReplication() != REDIS_OK) { + if (startBgsaveForReplication(0) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; @@ -613,6 +613,8 @@ void replconfCommand(redisClient *c) { &port,NULL) != REDIS_OK)) return; c->slave_listening_port = port; + } else if (!strcasecmp(c->argv[j]->ptr,"eof-supported")) { + c->repl_eof_supported = 1; } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { /* REPLCONF ACK is used by slave to inform the master the amount * of replication stream that it processed so far. It is an @@ -745,7 +747,8 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { * (if it had a disk or socket target). */ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; - int startbgsave = 0; + int slaves_waiting_eof = 0; + int slaves_waiting_noneof = 0; listIter li; listRewind(server.slaves,&li); @@ -753,7 +756,10 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { - startbgsave = 1; + if (slave->repl_eof_supported) + slaves_waiting_eof++; + else + slaves_waiting_noneof++; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { struct redis_stat buf; @@ -801,8 +807,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { } } } - if (startbgsave) { - if (startBgsaveForReplication() != REDIS_OK) { + if (slaves_waiting_eof || slaves_waiting_noneof) { + /* if there is at least one slave that doesn't support EOF, we'll start an non-eof replication */ + if (startBgsaveForReplication(slaves_waiting_noneof==0) != REDIS_OK) { listIter li; listRewind(server.slaves,&li); @@ -825,7 +832,7 @@ void replicationAbortSyncTransfer(void) { aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); close(server.repl_transfer_s); - if (!server.repl_diskless_sync) { + if (server.repl_transfer_fd!=-1) { close(server.repl_transfer_fd); unlink(server.repl_transfer_tmpfile); zfree(server.repl_transfer_tmpfile); @@ -938,18 +945,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * at the next call. */ server.repl_transfer_size = 0; redisLog(REDIS_NOTICE, - "MASTER <-> SLAVE sync: receiving streamed RDB from master"); + "MASTER <-> SLAVE sync: receiving streamed RDB from master with EOF %s", + server.repl_diskless_load? "to parser":"to disk"); } else { usemark = 0; server.repl_transfer_size = strtol(buf+1,NULL,10); redisLog(REDIS_NOTICE, - "MASTER <-> SLAVE sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + "MASTER <-> SLAVE sync: receiving %lld bytes from master %s", + (long long) server.repl_transfer_size, + server.repl_diskless_load? "to parser":"to disk"); } return; } - if (!server.repl_diskless_sync) { + if (!server.repl_diskless_load) { /* read the data from the socket, store it to a file and search for the EOF */ if (usemark) { readlen = sizeof(buf); @@ -1033,7 +1042,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * time for non blocking loading. */ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); - if (server.repl_diskless_sync) { + if (server.repl_diskless_load) { rio rdb; rioInitWithFd(&rdb,fd); /* Put the socket in blocking mode to simplify RDB transfer. @@ -1370,6 +1379,18 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } sdsfree(err); } + + /* Inform the master that this slave supports EOF marker of diskless-sync */ + { + err = sendSynchronousCommand(fd,"REPLCONF","eof-supported","yes", + NULL); + /* Ignore the error if any, not all the Redis versions support + * REPLCONF eof-supported. */ + if (err[0] == '-') { + redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF eof-supported: %s", err); + } + sdsfree(err); + } /* Try a partial resynchonization. If we don't have a cached master * slaveTryPartialResynchronization() will at least try to use PSYNC @@ -1395,7 +1416,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Prepare a suitable temp file for bulk transfer */ - if (!server.repl_diskless_sync) { + if (!server.repl_diskless_load) { while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); @@ -2133,7 +2154,8 @@ void replicationCron(void) { * slaves in WAIT_BGSAVE_START state. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { time_t idle, max_idle = 0; - int slaves_waiting = 0; + int slaves_waiting_eof = 0; + int slaves_waiting_noneof = 0; listNode *ln; listIter li; @@ -2143,14 +2165,18 @@ void replicationCron(void) { if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { idle = server.unixtime - slave->lastinteraction; if (idle > max_idle) max_idle = idle; - slaves_waiting++; + if (slave->repl_eof_supported) + slaves_waiting_eof++; + else + slaves_waiting_noneof++; } } - if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { + if ((slaves_waiting_eof || slaves_waiting_noneof) && max_idle > server.repl_diskless_sync_delay) { /* Start a BGSAVE. Usually with socket target, or with disk target - * if there was a recent socket -> disk config change. */ - if (startBgsaveForReplication() == REDIS_OK) { + * if there was a recent socket -> disk config change. + * if there is at least one slave that doesn't support EOF, we'll start an non-eof replication */ + if (startBgsaveForReplication(slaves_waiting_noneof==0) == REDIS_OK) { /* It started! We need to change the state of slaves * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case * the current target is disk. Otherwise it was already done |