From 5ee2ccf48e75012b2cabefd89f40bd09a1f10258 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 16 Oct 2014 17:09:29 +0200 Subject: Diskless replication: EOF: streaming support slave side. --- src/replication.c | 67 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/src/replication.c b/src/replication.c index 927b99174..28ecd0bd4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -818,6 +818,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { REDIS_NOTUSED(privdata); REDIS_NOTUSED(mask); + /* Static vars used to hold the EOF mark, and the last bytes received + * form the server: when they match, we reached the end of the transfer. */ + static char eofmark[REDIS_RUN_ID_SIZE]; + static char lastbytes[REDIS_RUN_ID_SIZE]; + static int usemark = 0; + /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ if (server.repl_transfer_size == -1) { @@ -843,16 +849,41 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; } - server.repl_transfer_size = strtol(buf+1,NULL,10); - redisLog(REDIS_NOTICE, - "MASTER <-> SLAVE sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + + /* There are two possible forms for the bulk payload. One is the + * usual $ bulk format. The other is used for diskless transfers + * when the master does not know beforehand the size of the file to + * transfer. In the latter case, the following format is used: + * + * $EOF:<40 bytes delimiter> + * + * At the end of the file the announced delimiter is transmitted. The + * delimiter is long and random enough that the probability of a + * collision with the actual file content can be ignored. */ + if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) { + usemark = 1; + memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE); + memset(lastbytes,0,REDIS_RUN_ID_SIZE); + redisLog(REDIS_NOTICE, + "MASTER <-> SLAVE sync: receiving streamed RDB from master"); + } else { + usemark = 0; + server.repl_transfer_size = strtol(buf+1,NULL,10); + redisLog(REDIS_NOTICE, + "MASTER <-> SLAVE sync: receiving %lld bytes from master", + (long long) server.repl_transfer_size); + } return; } /* Read bulk data */ - left = server.repl_transfer_size - server.repl_transfer_read; - readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); + if (usemark) { + left = server.repl_transfer_size - server.repl_transfer_read; + readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); + } else { + readlen = sizeof(buf); + } + nread = read(fd,buf,readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", @@ -860,6 +891,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { replicationAbortSyncTransfer(); return; } + + /* When a mark is used, we want to detect EOF asap in order to avoid + * writing the EOF mark into the file... */ + int eof_reached = 0; + + if (usemark) { + /* Update the last bytes array, and check if it matches our delimiter. */ + if (nread >= REDIS_RUN_ID_SIZE) { + memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE); + } else { + int rem = REDIS_RUN_ID_SIZE-nread; + memmove(lastbytes,lastbytes+nread,rem); + memcpy(lastbytes+rem,buf,nread); + } + if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1; + } + server.repl_transfer_lastio = server.unixtime; if (write(server.repl_transfer_fd,buf,nread) != nread) { redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); @@ -881,7 +929,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Check if the transfer is now complete */ - if (server.repl_transfer_read == server.repl_transfer_size) { + if (!usemark) { + if (server.repl_transfer_read == server.repl_transfer_size) + eof_reached = 1; + } + + if (eof_reached) { if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); replicationAbortSyncTransfer(); -- cgit v1.2.1