summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-07-08 18:32:47 +0200
committerantirez <antirez@gmail.com>2019-07-08 18:32:47 +0200
commit81b18fa3a0926b60a59083eee144cbf3d0e2fd64 (patch)
tree5079162eff23cd49c0a20fce4e7a6f8eefbceed9
parentd984732b3517dae198422080a6adf0cc96c1dd92 (diff)
downloadredis-81b18fa3a0926b60a59083eee144cbf3d0e2fd64.tar.gz
Diskless replica: a few aesthetic changes to replication.c.
-rw-r--r--src/replication.c126
-rw-r--r--src/rio.c12
2 files changed, 96 insertions, 42 deletions
diff --git a/src/replication.c b/src/replication.c
index e2bac08bd..a7c1c0d6a 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1127,7 +1127,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
ssize_t nread, readlen, nwritten;
int use_diskless_load;
redisDb *diskless_load_backup = NULL;
- int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
+ int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
+ EMPTYDB_NO_FLAGS;
int i;
off_t left;
UNUSED(el);
@@ -1199,8 +1200,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
use_diskless_load = useDisklessLoad();
if (!use_diskless_load) {
-
- /* read the data from the socket, store it to a file and search for the EOF */
+ /* Read the data from the socket, store it to a file and search
+ * for the EOF. */
if (usemark) {
readlen = sizeof(buf);
} else {
@@ -1222,20 +1223,28 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
int eof_reached = 0;
if (usemark) {
- /* Update the last bytes array, and check if it matches our delimiter.*/
+ /* Update the last bytes array, and check if it matches our
+ * delimiter. */
if (nread >= CONFIG_RUN_ID_SIZE) {
- memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
+ memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
+ CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
- if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
+ if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
+ eof_reached = 1;
}
+ /* Update the last I/O time for the replication transfer (used in
+ * order to detect timeouts during replication), and write what we
+ * got from the socket to the dump file on disk. */
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
- serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",
+ serverLog(LL_WARNING,
+ "Write error or short write writing to the DB dump file "
+ "needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
@@ -1246,14 +1255,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
- serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
+ serverLog(LL_WARNING,
+ "Error truncating the RDB file received from the master "
+ "for SYNC: %s", strerror(errno));
goto error;
}
}
- /* Sync data on disk from time to time, otherwise at the end of the transfer
- * we may suffer a big delay as the memory buffers are copied into the
- * actual disk. */
+ /* Sync data on disk from time to time, otherwise at the end of the
+ * transfer we may suffer a big delay as the memory buffers are copied
+ * into the actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
@@ -1269,19 +1280,34 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
- if (!eof_reached)
- return;
+
+ /* If the transfer is yet not complete, we need to read more, so
+ * return ASAP and wait for the handler to be called again. */
+ if (!eof_reached) return;
}
- /* We reach here when the slave is using diskless replication,
- * or when we are done reading from the socket to the rdb file. */
+ /* We reach this point in one of the following cases:
+ *
+ * 1. The replica is using diskless replication, that is, it reads data
+ * directly from the socket to the Redis memory, without using
+ * a temporary RDB file on disk. In that case we just block and
+ * read everything from the socket.
+ *
+ * 2. Or when we are done reading from the socket to the RDB file, in
+ * such case we want just to read the RDB file in memory. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
- /* We need to stop any AOFRW fork before flusing and parsing
- * RDB, otherwise we'll create a copy-on-write disaster. */
+
+ /* We need to stop any AOF rewriting child before flusing and parsing
+ * the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
signalFlushedDb(-1);
- if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
- /* create a backup of the current db */
+
+ /* When diskless RDB loading is used by replicas, it may be configured
+ * in order to save the current DB instead of throwing it away,
+ * so that we can restore it in case of failed transfer. */
+ if (use_diskless_load &&
+ server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
+ {
diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum);
for (i=0; i<server.dbnum; i++) {
diskless_load_backup[i] = server.db[i];
@@ -1291,6 +1317,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
} else {
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
+
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
@@ -1301,21 +1328,25 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (use_diskless_load) {
rio rdb;
rioInitWithFd(&rdb,fd,server.repl_transfer_size);
+
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
anetBlock(NULL,fd);
anetRecvTimeout(NULL,fd,server.repl_timeout*1000);
-
startLoading(server.repl_transfer_size);
+
if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
- /* rdbloading failed */
+ /* RDB loading failed. */
stopLoading();
- serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from socket");
+ serverLog(LL_WARNING,
+ "Failed trying to load the MASTER synchronization DB "
+ "from socket");
cancelReplicationHandshake();
rioFreeFd(&rdb, NULL);
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
- /* restore the backed up db */
- emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback);
+ /* Restore the backed up databases. */
+ emptyDbGeneric(server.db,-1,empty_db_flags,
+ replicationEmptyDbCallback);
for (i=0; i<server.dbnum; i++) {
dictRelease(server.db[i].dict);
dictRelease(server.db[i].expires);
@@ -1323,35 +1354,47 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
zfree(diskless_load_backup);
} else {
- /* Remove the half-loaded data */
+ /* Remove the half-loaded data in case we started with
+ * an empty replica. */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
- /* Note that there's no point in restarting the AOF on sync failure,
- it'll be restarted when sync succeeds or slave promoted. */
+
+ /* Note that there's no point in restarting the AOF on SYNC
+ * failure, it'll be restarted when sync succeeds or the replica
+ * gets promoted. */
return;
}
stopLoading();
- /* rdbloading succeeded */
+
+ /* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
- /* delete the backup db that we created before starting to load the new rdb */
- emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,replicationEmptyDbCallback);
+ /* Delete the backup databases we created before starting to load
+ * the new RDB. Now the RDB was loaded with success so the old
+ * data is useless. */
+ emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,
+ replicationEmptyDbCallback);
for (i=0; i<server.dbnum; i++) {
dictRelease(diskless_load_backup[i].dict);
dictRelease(diskless_load_backup[i].expires);
}
zfree(diskless_load_backup);
}
+
+ /* Verify the end mark is correct. */
if (usemark) {
- if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) {
+ if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
+ memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
+ {
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake();
rioFreeFd(&rdb, NULL);
return;
}
}
- /* get the unread command stream from the rio buffer */
+
+ /* Cleanup and restore the socket to the original state to continue
+ * with the normal replication. */
rioFreeFd(&rdb, NULL);
- /* Restore the socket as non-blocking. */
anetNonBlock(NULL,fd);
anetRecvTimeout(NULL,fd,0);
} else {
@@ -1367,40 +1410,49 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
- serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s",
+ serverLog(LL_WARNING,
+ "Failed trying to rename the temp DB into %s in "
+ "MASTER <-> REPLICA synchronization: %s",
server.rdb_filename, strerror(errno));
cancelReplicationHandshake();
return;
}
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
- serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
+ serverLog(LL_WARNING,
+ "Failed trying to load the MASTER synchronization "
+ "DB from disk");
cancelReplicationHandshake();
/* Note that there's no point in restarting the AOF on sync failure,
- it'll be restarted when sync succeeds or slave promoted. */
+ it'll be restarted when sync succeeds or replica promoted. */
return;
}
+
+ /* Cleanup. */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;
}
+
/* Final setup of the connected slave <- master link */
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
+
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
+
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
-
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
+
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
diff --git a/src/rio.c b/src/rio.c
index 993768b56..9327c17a8 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -173,13 +173,13 @@ static size_t rioFdRead(rio *r, void *buf, size_t len) {
/* if the buffer is too small for the entire request: realloc */
if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len)
r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf));
-
+
/* if the remaining unused buffer is not large enough: memmove so that we can read the rest */
if (len > avail && sdsavail(r->io.fd.buf) < len - avail) {
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
r->io.fd.pos = 0;
}
-
+
/* if we don't already have all the data in the sds, read more */
while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) {
size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos;
@@ -251,8 +251,10 @@ void rioInitWithFd(rio *r, int fd, size_t read_limit) {
/* release the rio stream.
* optionally returns the unread buffered data. */
-void rioFreeFd(rio *r, sds* out_remainingBufferedData) {
- if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) {
+void rioFreeFd(rio *r, sds *out_remainingBufferedData) {
+ if (out_remainingBufferedData &&
+ (size_t)r->io.fd.pos < sdslen(r->io.fd.buf))
+ {
if (r->io.fd.pos > 0)
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
*out_remainingBufferedData = r->io.fd.buf;
@@ -264,7 +266,7 @@ void rioFreeFd(rio *r, sds* out_remainingBufferedData) {
r->io.fd.buf = NULL;
}
-/* ------------------- File descriptors set implementation ------------------- */
+/* ------------------- File descriptors set implementation ------------------ */
/* Returns 1 or 0 for success/failure.
* The function returns success as long as we are able to correctly write