diff options
author | antirez <antirez@gmail.com> | 2019-07-08 18:32:47 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2019-07-08 18:32:47 +0200 |
commit | 81b18fa3a0926b60a59083eee144cbf3d0e2fd64 (patch) | |
tree | 5079162eff23cd49c0a20fce4e7a6f8eefbceed9 | |
parent | d984732b3517dae198422080a6adf0cc96c1dd92 (diff) | |
download | redis-81b18fa3a0926b60a59083eee144cbf3d0e2fd64.tar.gz |
Diskless replica: a few aesthetic changes to replication.c.
-rw-r--r-- | src/replication.c | 126 | ||||
-rw-r--r-- | src/rio.c | 12 |
2 files changed, 96 insertions, 42 deletions
diff --git a/src/replication.c b/src/replication.c index e2bac08bd..a7c1c0d6a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1127,7 +1127,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { ssize_t nread, readlen, nwritten; int use_diskless_load; redisDb *diskless_load_backup = NULL; - int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : + EMPTYDB_NO_FLAGS; int i; off_t left; UNUSED(el); @@ -1199,8 +1200,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { use_diskless_load = useDisklessLoad(); if (!use_diskless_load) { - - /* read the data from the socket, store it to a file and search for the EOF */ + /* Read the data from the socket, store it to a file and search + * for the EOF. */ if (usemark) { readlen = sizeof(buf); } else { @@ -1222,20 +1223,28 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { int eof_reached = 0; if (usemark) { - /* Update the last bytes array, and check if it matches our delimiter.*/ + /* Update the last bytes array, and check if it matches our + * delimiter. */ if (nread >= CONFIG_RUN_ID_SIZE) { - memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); + memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE, + CONFIG_RUN_ID_SIZE); } else { int rem = CONFIG_RUN_ID_SIZE-nread; memmove(lastbytes,lastbytes+nread,rem); memcpy(lastbytes+rem,buf,nread); } - if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; + if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) + eof_reached = 1; } + /* Update the last I/O time for the replication transfer (used in + * order to detect timeouts during replication), and write what we + * got from the socket to the dump file on disk. */ server.repl_transfer_lastio = server.unixtime; if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { - serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", + serverLog(LL_WARNING, + "Write error or short write writing to the DB dump file " + "needed for MASTER <-> REPLICA synchronization: %s", (nwritten == -1) ? strerror(errno) : "short write"); goto error; } @@ -1246,14 +1255,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (ftruncate(server.repl_transfer_fd, server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) { - serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); + serverLog(LL_WARNING, + "Error truncating the RDB file received from the master " + "for SYNC: %s", strerror(errno)); goto error; } } - /* Sync data on disk from time to time, otherwise at the end of the transfer - * we may suffer a big delay as the memory buffers are copied into the - * actual disk. */ + /* Sync data on disk from time to time, otherwise at the end of the + * transfer we may suffer a big delay as the memory buffers are copied + * into the actual disk. */ if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { @@ -1269,19 +1280,34 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1; } - if (!eof_reached) - return; + + /* If the transfer is yet not complete, we need to read more, so + * return ASAP and wait for the handler to be called again. */ + if (!eof_reached) return; } - /* We reach here when the slave is using diskless replication, - * or when we are done reading from the socket to the rdb file. */ + /* We reach this point in one of the following cases: + * + * 1. The replica is using diskless replication, that is, it reads data + * directly from the socket to the Redis memory, without using + * a temporary RDB file on disk. In that case we just block and + * read everything from the socket. + * + * 2. Or when we are done reading from the socket to the RDB file, in + * such case we want just to read the RDB file in memory. */ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); - /* We need to stop any AOFRW fork before flusing and parsing - * RDB, otherwise we'll create a copy-on-write disaster. */ + + /* We need to stop any AOF rewriting child before flusing and parsing + * the RDB, otherwise we'll create a copy-on-write disaster. */ if (server.aof_state != AOF_OFF) stopAppendOnly(); signalFlushedDb(-1); - if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* create a backup of the current db */ + + /* When diskless RDB loading is used by replicas, it may be configured + * in order to save the current DB instead of throwing it away, + * so that we can restore it in case of failed transfer. */ + if (use_diskless_load && + server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) + { diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum); for (i=0; i<server.dbnum; i++) { diskless_load_backup[i] = server.db[i]; @@ -1291,6 +1317,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } else { emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); } + /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to @@ -1301,21 +1328,25 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (use_diskless_load) { rio rdb; rioInitWithFd(&rdb,fd,server.repl_transfer_size); + /* Put the socket in blocking mode to simplify RDB transfer. * We'll restore it when the RDB is received. */ anetBlock(NULL,fd); anetRecvTimeout(NULL,fd,server.repl_timeout*1000); - startLoading(server.repl_transfer_size); + if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { - /* rdbloading failed */ + /* RDB loading failed. */ stopLoading(); - serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from socket"); + serverLog(LL_WARNING, + "Failed trying to load the MASTER synchronization DB " + "from socket"); cancelReplicationHandshake(); rioFreeFd(&rdb, NULL); if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* restore the backed up db */ - emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback); + /* Restore the backed up databases. */ + emptyDbGeneric(server.db,-1,empty_db_flags, + replicationEmptyDbCallback); for (i=0; i<server.dbnum; i++) { dictRelease(server.db[i].dict); dictRelease(server.db[i].expires); @@ -1323,35 +1354,47 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } zfree(diskless_load_backup); } else { - /* Remove the half-loaded data */ + /* Remove the half-loaded data in case we started with + * an empty replica. */ emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); } - /* Note that there's no point in restarting the AOF on sync failure, - it'll be restarted when sync succeeds or slave promoted. */ + + /* Note that there's no point in restarting the AOF on SYNC + * failure, it'll be restarted when sync succeeds or the replica + * gets promoted. */ return; } stopLoading(); - /* rdbloading succeeded */ + + /* RDB loading succeeded if we reach this point. */ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* delete the backup db that we created before starting to load the new rdb */ - emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,replicationEmptyDbCallback); + /* Delete the backup databases we created before starting to load + * the new RDB. Now the RDB was loaded with success so the old + * data is useless. */ + emptyDbGeneric(diskless_load_backup,-1,empty_db_flags, + replicationEmptyDbCallback); for (i=0; i<server.dbnum; i++) { dictRelease(diskless_load_backup[i].dict); dictRelease(diskless_load_backup[i].expires); } zfree(diskless_load_backup); } + + /* Verify the end mark is correct. */ if (usemark) { - if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) { + if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || + memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) + { serverLog(LL_WARNING,"Replication stream EOF marker is broken"); cancelReplicationHandshake(); rioFreeFd(&rdb, NULL); return; } } - /* get the unread command stream from the rio buffer */ + + /* Cleanup and restore the socket to the original state to continue + * with the normal replication. */ rioFreeFd(&rdb, NULL); - /* Restore the socket as non-blocking. */ anetNonBlock(NULL,fd); anetRecvTimeout(NULL,fd,0); } else { @@ -1367,40 +1410,49 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { - serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s", + serverLog(LL_WARNING, + "Failed trying to rename the temp DB into %s in " + "MASTER <-> REPLICA synchronization: %s", server.rdb_filename, strerror(errno)); cancelReplicationHandshake(); return; } if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { - serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); + serverLog(LL_WARNING, + "Failed trying to load the MASTER synchronization " + "DB from disk"); cancelReplicationHandshake(); /* Note that there's no point in restarting the AOF on sync failure, - it'll be restarted when sync succeeds or slave promoted. */ + it'll be restarted when sync succeeds or replica promoted. */ return; } + + /* Cleanup. */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); server.repl_transfer_fd = -1; server.repl_transfer_tmpfile = NULL; } + /* Final setup of the connected slave <- master link */ replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; + /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */ memcpy(server.replid,server.master->replid,sizeof(server.replid)); server.master_repl_offset = server.master->reploff; clearReplicationId2(); + /* Let's create the replication backlog if needed. Slaves need to * accumulate the backlog regardless of the fact they have sub-slaves * or not, in order to behave correctly if they are promoted to * masters after a failover. */ if (server.repl_backlog == NULL) createReplicationBacklog(); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); + /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ @@ -173,13 +173,13 @@ static size_t rioFdRead(rio *r, void *buf, size_t len) { /* if the buffer is too small for the entire request: realloc */ if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len) r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf)); - + /* if the remaining unused buffer is not large enough: memmove so that we can read the rest */ if (len > avail && sdsavail(r->io.fd.buf) < len - avail) { sdsrange(r->io.fd.buf, r->io.fd.pos, -1); r->io.fd.pos = 0; } - + /* if we don't already have all the data in the sds, read more */ while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) { size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos; @@ -251,8 +251,10 @@ void rioInitWithFd(rio *r, int fd, size_t read_limit) { /* release the rio stream. * optionally returns the unread buffered data. */ -void rioFreeFd(rio *r, sds* out_remainingBufferedData) { - if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) { +void rioFreeFd(rio *r, sds *out_remainingBufferedData) { + if (out_remainingBufferedData && + (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) + { if (r->io.fd.pos > 0) sdsrange(r->io.fd.buf, r->io.fd.pos, -1); *out_remainingBufferedData = r->io.fd.buf; @@ -264,7 +266,7 @@ void rioFreeFd(rio *r, sds* out_remainingBufferedData) { r->io.fd.buf = NULL; } -/* ------------------- File descriptors set implementation ------------------- */ +/* ------------------- File descriptors set implementation ------------------ */ /* Returns 1 or 0 for success/failure. * The function returns success as long as we are able to correctly write |