summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2014-10-14 10:11:26 +0200
committerantirez <antirez@gmail.com>2014-10-14 10:11:29 +0200
commit75f0cd6520c73bc868717940ed583c4809ab30db (patch)
tree6b4807ec5e88adfa15afcf1fc22e19209fc00bc2
parent850ea57c37e517eb0f10d8fc319332ca339d0ba2 (diff)
downloadredis-75f0cd6520c73bc868717940ed583c4809ab30db.tar.gz
Diskless replication: RDB -> slaves transfer draft implementation.
-rw-r--r--src/rdb.c139
-rw-r--r--src/rdb.h1
-rw-r--r--src/redis.c1
-rw-r--r--src/redis.h6
-rw-r--r--src/replication.c81
5 files changed, 200 insertions, 28 deletions
diff --git a/src/rdb.c b/src/rdb.c
index bd6d1e579..4d45c424f 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -689,6 +689,32 @@ werr:
return REDIS_ERR;
}
+/* This is just a wrapper to rdbSaveRio() that additionally adds a prefix
+ * and a suffix to the generated RDB dump. The prefix is:
+ *
+ * $EOF:<40 bytes unguessable hex string>\r\n
+ *
+ * While the suffix is the 40 bytes hex string we announced in the prefix.
+ * This way processes receiving the payload can understand when it ends
+ * without doing any processing of the content. */
+int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
+ char eofmark[REDIS_EOF_MARK_SIZE];
+
+ getRandomHexChars(eofmark,REDIS_EOF_MARK_SIZE);
+ if (error) *error = 0;
+ if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
+ if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr;
+ if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
+ if (rdbSaveRio(rdb,error) == REDIS_ERR) goto werr;
+ if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr;
+ return REDIS_OK;
+
+werr: /* Write error. */
+ /* Set 'error' only if not already set by rdbSaveRio() call. */
+ if (error && *error == 0) *error = errno;
+ return REDIS_ERR;
+}
+
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */
int rdbSave(char *filename) {
char tmpfile[256];
@@ -1211,8 +1237,9 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
return REDIS_ERR; /* Just to avoid warning */
}
-/* A background saving child (BGSAVE) terminated its work. Handle this. */
-void backgroundSaveDoneHandler(int exitcode, int bysignal) {
+/* A background saving child (BGSAVE) terminated its work. Handle this.
+ * This function covers the case of actual BGSAVEs. */
+void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
@@ -1242,7 +1269,113 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
server.rdb_save_time_start = -1;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
- updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR);
+ updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_DISK);
+}
+
+/* A background saving child (BGSAVE) terminated its work. Handle this.
+ * This function covers the case of RDB -> Salves socket transfers for
+ * diskless replication. */
+void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
+ if (!bysignal && exitcode == 0) {
+ redisLog(REDIS_NOTICE,
+ "Background RDB transfer terminated with success");
+ } else if (!bysignal && exitcode != 0) {
+ redisLog(REDIS_WARNING, "Background transfer error");
+ } else {
+ redisLog(REDIS_WARNING,
+ "Background transfer terminated by signal %d", bysignal);
+ }
+ server.rdb_child_pid = -1;
+ server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
+ server.rdb_save_time_start = -1;
+ /* Possibly there are slaves waiting for a BGSAVE in order to be served
+ * (the first stage of SYNC is a bulk transfer of dump.rdb) */
+ updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET);
+}
+
+/* When a background RDB saving/transfer terminates, call the right handler. */
+void backgroundSaveDoneHandler(int exitcode, int bysignal) {
+ switch(server.rdb_child_type) {
+ case REDIS_RDB_CHILD_TYPE_DISK:
+ backgroundSaveDoneHandlerDisk(exitcode,bysignal);
+ break;
+ case REDIS_RDB_CHILD_TYPE_SOCKET:
+ backgroundSaveDoneHandlerSocket(exitcode,bysignal);
+ break;
+ default:
+ redisPanic("Unknown RDB child type.");
+ break;
+ }
+}
+
+/* Spawn an RDB child that writes the RDB to the sockets of the slaves
+ * that are currently in REDIS_REPL_WAIT_BGSAVE_START state. */
+int rdbSaveToSlavesSockets(void) {
+ int *fds;
+ int numfds;
+ listNode *ln;
+ listIter li;
+ pid_t childpid;
+ long long start;
+
+ if (server.rdb_child_pid != -1) return REDIS_ERR;
+
+ fds = zmalloc(sizeof(int)*listLength(server.slaves));
+ numfds = 0;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = ln->value;
+
+ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
+ fds[numfds++] = slave->fd;
+ slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
+ }
+ }
+
+ /* Fork ... */
+ start = ustime();
+ if ((childpid = fork()) == 0) {
+ /* Child */
+ int retval;
+ rio slave_sockets;
+
+ rioInitWithFdset(&slave_sockets,fds,numfds);
+ zfree(fds);
+
+ closeListeningSockets(0);
+ redisSetProcTitle("redis-rdb-to-slaves");
+
+ retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
+ if (retval == REDIS_OK) {
+ size_t private_dirty = zmalloc_get_private_dirty();
+
+ if (private_dirty) {
+ redisLog(REDIS_NOTICE,
+ "RDB: %zu MB of memory used by copy-on-write",
+ private_dirty/(1024*1024));
+ }
+ }
+ exitFromChild((retval == REDIS_OK) ? 0 : 1);
+ } else {
+ /* Parent */
+ server.stat_fork_time = ustime()-start;
+ server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
+ latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
+ if (childpid == -1) {
+ redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
+ strerror(errno));
+ return REDIS_ERR;
+ }
+ redisLog(REDIS_NOTICE,"Background RDB transfer started by pid %d",childpid);
+ server.rdb_save_time_start = time(NULL);
+ server.rdb_child_pid = childpid;
+ server.rdb_child_type = REDIS_RDB_CHILD_TYPE_SOCKET;
+ updateDictResizePolicy();
+ zfree(fds);
+ return REDIS_OK;
+ }
+ return REDIS_OK; /* unreached */
}
void saveCommand(redisClient *c) {
diff --git a/src/rdb.h b/src/rdb.h
index 54ee4e514..eb40d4993 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -101,6 +101,7 @@ int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename);
int rdbSaveBackground(char *filename);
+int rdbSaveToSlavesSockets(void);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename);
int rdbSaveObject(rio *rdb, robj *o);
diff --git a/src/redis.c b/src/redis.c
index 3340ecef9..d8ba5675a 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -1480,6 +1480,7 @@ void initServerConfig(void) {
server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY;
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
+ server.repl_diskless = REDIS_DEFAULT_RDB_DISKLESS;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
server.master_repl_offset = 0;
diff --git a/src/redis.h b/src/redis.h
index 5e756f0f0..d8f2b444c 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -96,6 +96,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_REPL_TIMEOUT 60
#define REDIS_REPL_PING_SLAVE_PERIOD 10
#define REDIS_RUN_ID_SIZE 40
+#define REDIS_EOF_MARK_SIZE 40
#define REDIS_OPS_SEC_SAMPLES 16
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */
@@ -113,6 +114,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_DEFAULT_RDB_COMPRESSION 1
#define REDIS_DEFAULT_RDB_CHECKSUM 1
#define REDIS_DEFAULT_RDB_FILENAME "dump.rdb"
+#define REDIS_DEFAULT_RDB_DISKLESS 0
+#define REIDS_DEFAULT_RDB_DISKLESS_DELAY 5
#define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define REDIS_DEFAULT_SLAVE_READ_ONLY 1
#define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0
@@ -796,6 +799,7 @@ struct redisServer {
int repl_min_slaves_to_write; /* Min number of slaves to write. */
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
+ int repl_diskless; /* Send RDB to slaves sockets directly. */
/* Replication (slave) */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
@@ -1138,7 +1142,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc);
-void updateSlavesWaitingBgsave(int bgsaveerr);
+void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
void replicationHandleMasterDisconnection(void);
void replicationCacheMaster(redisClient *c);
diff --git a/src/replication.c b/src/replication.c
index f4ac58f4d..ba39fddb1 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -408,6 +408,28 @@ need_full_resync:
return REDIS_ERR;
}
+/* Start a BGSAVE for replication goals, which is, selecting the disk or
+ * socket target depending on the configuration, and making sure that
+ * the script cache is flushed before to start.
+ *
+ * Returns REDIS_OK on success or REDIS_ERR otherwise. */
+int startBgsaveForReplication(void) {
+ int retval;
+
+ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s",
+ server.repl_diskless ? "slaves sockets" : "disk");
+
+ if (server.repl_diskless)
+ retval = rdbSaveToSlavesSockets();
+ else
+ retval = rdbSaveBackground(server.rdb_filename);
+
+ /* Flush the script cache, since we need that slave differences are
+ * accumulated without requiring slaves to match our cached scripts. */
+ if (retval == REDIS_OK) replicationScriptCacheFlush();
+ return retval;
+}
+
/* SYNC and PSYNC command implemenation. */
void syncCommand(redisClient *c) {
/* ignore SYNC if already slave or in monitor mode */
@@ -465,7 +487,9 @@ void syncCommand(redisClient *c) {
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
- if (server.rdb_child_pid != -1) {
+ if (server.rdb_child_pid != -1 &&
+ server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK)
+ {
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save. */
@@ -480,12 +504,7 @@ void syncCommand(redisClient *c) {
}
if (ln) {
/* Perfect, the server is already registering differences for
- * another slave. Set the right state, and copy the buffer.
- *
- * Note that if we found a slave in WAIT_BGSAVE_END state, this
- * means that the current child is of type
- * REDIS_RDB_CHILD_TYPE_DISK, since the first slave in this state
- * can only be added when an RDB save with disk target is started. */
+ * another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
@@ -495,17 +514,31 @@ void syncCommand(redisClient *c) {
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
+ } else if (server.rdb_child_pid != -1 &&
+ server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET)
+ {
+ /* There is an RDB child process but it is writing directly to
+ * children sockets. We need to wait for the next BGSAVE
+ * in order to synchronize. */
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
+ redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
} else {
- /* Ok we don't have a BGSAVE in progress, let's start one. */
- redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
- if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
- redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
- addReplyError(c,"Unable to perform background save");
- return;
+ if (server.repl_diskless) {
+ /* Diskless replication RDB child is created inside
+ * replicationCron() since we want to delay its start a
+ * few seconds to wait for more slaves to arrive. */
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
+ redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC");
+ } else {
+ /* Ok we don't have a BGSAVE in progress, let's start one. */
+ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
+ if (startBgsaveForReplication() != REDIS_OK) {
+ redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
+ addReplyError(c,"Unable to perform background save");
+ return;
+ }
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
}
- c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
- /* Flush the script cache for the new slave. */
- replicationScriptCacheFlush();
}
if (server.repl_disable_tcp_nodelay)
@@ -644,10 +677,15 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
/* This function is called at the end of every background saving.
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function.
+ * The 'type' argument is the type of the child that terminated
+ * (if it had a disk or socket target).
*
* The goal of this function is to handle slaves waiting for a successful
- * background saving in order to perform non-blocking synchronization. */
-void updateSlavesWaitingBgsave(int bgsaveerr) {
+ * background saving in order to perform non-blocking synchronization, and
+ * to schedule a new BGSAVE if there are slaves that attached while a
+ * BGSAVE was in progress, but it was not a good one for replication (no
+ * other slave was accumulating differences). */
+void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
int startbgsave = 0;
listIter li;
@@ -687,12 +725,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr) {
}
}
if (startbgsave) {
- /* Since we are starting a new background save for one or more slaves,
- * we flush the Replication Script Cache to use EVAL to propagate every
- * new EVALSHA for the first time, since all the new slaves don't know
- * about previous scripts. */
- replicationScriptCacheFlush();
- if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
+ if (startBgsaveForReplication() != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);