From 8beb98574ab285c910c50c877d688a11960d5bd5 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 7 Oct 2014 12:56:23 +0200 Subject: RDB file creation refactored to target non-disk target. --- src/rdb.c | 73 ++++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 4d789bc2b..d67dbd9de 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -627,45 +627,37 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, return 1; } -/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ -int rdbSave(char *filename) { +/* Produces a dump of the database in RDB format sending it to the specified + * Redis I/O channel. On success REDIS_OK is returned, otherwise REDIS_ERR + * is returned and part of the output, or all the output, can be + * missing because of I/O errors. + * + * When the function returns REDIS_ERR and if 'error' is not NULL, the + * integer pointed by 'error' is set to the value of errno just after the I/O + * error. */ +int rdbSaveRio(rio *rdb, int *error) { dictIterator *di = NULL; dictEntry *de; - char tmpfile[256]; char magic[10]; int j; long long now = mstime(); - FILE *fp; - rio rdb; uint64_t cksum; - snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); - fp = fopen(tmpfile,"w"); - if (!fp) { - redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s", - strerror(errno)); - return REDIS_ERR; - } - - rioInitWithFile(&rdb,fp); if (server.rdb_checksum) - rdb.update_cksum = rioGenericUpdateChecksum; + rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); - if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr; + if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); - if (!di) { - fclose(fp); - return REDIS_ERR; - } + if (!di) return REDIS_ERR; /* Write the SELECT DB opcode */ - if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr; - if (rdbSaveLen(&rdb,j) == -1) goto werr; + if (rdbSaveType(rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr; + if (rdbSaveLen(rdb,j) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { @@ -675,20 +667,48 @@ int rdbSave(char *filename) { initStaticStringObject(key,keystr); expire = getExpire(db,&key); - if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr; + if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr; } dictReleaseIterator(di); } di = NULL; /* So that we don't release it again on error. */ /* EOF opcode */ - if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr; + if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr; /* CRC64 checksum. It will be zero if checksum computation is disabled, the * loading code skips the check in this case. */ - cksum = rdb.cksum; + cksum = rdb->cksum; memrev64ifbe(&cksum); - if (rioWrite(&rdb,&cksum,8) == 0) goto werr; + if (rioWrite(rdb,&cksum,8) == 0) goto werr; + return REDIS_OK; + +werr: + if (error) *error = errno; + if (di) dictReleaseIterator(di); + return REDIS_ERR; +} + +/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */ +int rdbSave(char *filename) { + char tmpfile[256]; + FILE *fp; + rio rdb; + int error; + + snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); + fp = fopen(tmpfile,"w"); + if (!fp) { + redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s", + strerror(errno)); + return REDIS_ERR; + } + + rioInitWithFile(&rdb,fp); + if (rdbSaveRio(&rdb,&error) == REDIS_ERR) { + errno = error; + goto werr; + } /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; @@ -712,7 +732,6 @@ werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); - if (di) dictReleaseIterator(di); return REDIS_ERR; } -- cgit v1.2.1 From 2df8341c75f46cf2d6ec28804cbda6287766262d Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 8 Oct 2014 09:09:01 +0200 Subject: Define different types of RDB childs. We need to remember what is the saving strategy of the current RDB child process, since the configuration may be modified at runtime via CONFIG SET and still we'll need to understand, when the child exists, what to do and for what goal the process was initiated: to create an RDB file on disk or to write stuff directly to slave's sockets. --- src/rdb.c | 2 ++ src/redis.c | 1 + src/redis.h | 6 ++++++ 3 files changed, 9 insertions(+) diff --git a/src/rdb.c b/src/rdb.c index d67dbd9de..bd6d1e579 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -776,6 +776,7 @@ int rdbSaveBackground(char *filename) { redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_DISK; updateDictResizePolicy(); return REDIS_OK; } @@ -1236,6 +1237,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { server.lastbgsave_status = REDIS_ERR; } server.rdb_child_pid = -1; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start; server.rdb_save_time_start = -1; /* Possibly there are slaves waiting for a BGSAVE in order to be served diff --git a/src/redis.c b/src/redis.c index e7faa8859..3340ecef9 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1768,6 +1768,7 @@ void initServer(void) { server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ diff --git a/src/redis.h b/src/redis.h index a1ae0f2bc..5e756f0f0 100644 --- a/src/redis.h +++ b/src/redis.h @@ -361,6 +361,11 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_PROPAGATE_AOF 1 #define REDIS_PROPAGATE_REPL 2 +/* RDB active child save type. */ +#define REDIS_RDB_CHILD_TYPE_NONE 0 +#define REDIS_RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */ +#define REDIS_RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to slave socket. */ + /* Keyspace changes notification classes. Every class is associated with a * character for configuration purposes. */ #define REDIS_NOTIFY_KEYSPACE (1<<0) /* K */ @@ -764,6 +769,7 @@ struct redisServer { time_t lastbgsave_try; /* Unix time of last attempted bgsave */ time_t rdb_save_time_last; /* Time used by last RDB save run. */ time_t rdb_save_time_start; /* Current RDB save start time. */ + int rdb_child_type; /* Type of save by active child. */ int lastbgsave_status; /* REDIS_OK or REDIS_ERR */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ /* Propagation of commands in AOF / replication */ -- cgit v1.2.1 From 16546f5aca1236a179bff98af9aaee491cd6fd4d Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 10 Oct 2014 16:25:58 +0200 Subject: Add some comments in syncCommand() to clarify RDB target. --- src/replication.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/replication.c b/src/replication.c index 16014c8a9..f4ac58f4d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -468,7 +468,7 @@ void syncCommand(redisClient *c) { if (server.rdb_child_pid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is - * registering differences since the server forked to save */ + * registering differences since the server forked to save. */ redisClient *slave; listNode *ln; listIter li; @@ -480,18 +480,23 @@ void syncCommand(redisClient *c) { } if (ln) { /* Perfect, the server is already registering differences for - * another slave. Set the right state, and copy the buffer. */ + * another slave. Set the right state, and copy the buffer. + * + * Note that if we found a slave in WAIT_BGSAVE_END state, this + * means that the current child is of type + * REDIS_RDB_CHILD_TYPE_DISK, since the first slave in this state + * can only be added when an RDB save with disk target is started. */ copyClientOutputBuffer(c,slave); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to - * register differences */ + * register differences. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else { - /* Ok we don't have a BGSAVE in progress, let's start one */ + /* Ok we don't have a BGSAVE in progress, let's start one. */ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); -- cgit v1.2.1 From 29db3227ab8980f098080371ef57c4e1eeb38180 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 10 Oct 2014 16:33:16 +0200 Subject: rio.c refactoring before adding a new target. --- src/rio.c | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/src/rio.c b/src/rio.c index 44f9b7a01..aa5061b75 100644 --- a/src/rio.c +++ b/src/rio.c @@ -55,6 +55,8 @@ #include "config.h" #include "redis.h" +/* ------------------------- Buffer I/O implementation ----------------------- */ + /* Returns 1 or 0 for success/failure. */ static size_t rioBufferWrite(rio *r, const void *buf, size_t len) { r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len); @@ -76,6 +78,25 @@ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; } +static const rio rioBufferIO = { + rioBufferRead, + rioBufferWrite, + rioBufferTell, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithBuffer(rio *r, sds s) { + *r = rioBufferIO; + r->io.buffer.ptr = s; + r->io.buffer.pos = 0; +} + +/* --------------------- Stdio file pointer implementation ------------------- */ + /* Returns 1 or 0 for success/failure. */ static size_t rioFileWrite(rio *r, const void *buf, size_t len) { size_t retval; @@ -103,17 +124,6 @@ static off_t rioFileTell(rio *r) { return ftello(r->io.file.fp); } -static const rio rioBufferIO = { - rioBufferRead, - rioBufferWrite, - rioBufferTell, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ -}; - static const rio rioFileIO = { rioFileRead, rioFileWrite, @@ -132,11 +142,7 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } -void rioInitWithBuffer(rio *r, sds s) { - *r = rioBufferIO; - r->io.buffer.ptr = s; - r->io.buffer.pos = 0; -} +/* ---------------------------- Generic functions ---------------------------- */ /* This function can be installed both in memory and file streams when checksum * computation is needed. */ @@ -157,7 +163,8 @@ void rioSetAutoSync(rio *r, off_t bytes) { r->io.file.autosync = bytes; } -/* ------------------------------ Higher level interface --------------------------- +/* --------------------------- Higher level interface -------------------------- + * * The following higher level functions use lower level rio.c functions to help * generating the Redis protocol for the Append Only File. */ -- cgit v1.2.1 From 850ea57c37e517eb0f10d8fc319332ca339d0ba2 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 10 Oct 2014 17:44:06 +0200 Subject: rio.c: draft implementation of fdset target implemented. --- src/rio.c | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/rio.h | 9 +++++++++ 2 files changed, 69 insertions(+) diff --git a/src/rio.c b/src/rio.c index aa5061b75..88f6781e0 100644 --- a/src/rio.c +++ b/src/rio.c @@ -142,6 +142,66 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } +/* ------------------- File descriptors set implementation ------------------- */ + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { + size_t retval; + int j; + unsigned char *p = (unsigned char*) buf; + + /* Write in little chunchs so that when there are big writes we + * parallelize while the kernel is sending data in background to + * the TCP socket. */ + while(len) { + size_t count = len < 1024 ? len : 1024; + for (j = 0; j < r->io.fdset.numfds; j++) { + retval = write(r->io.fdset.fds[j],p,count); + if (retval != count) return 0; + } + p += count; + len -= count; + r->io.fdset.pos += count; + } + return 1; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdsetRead(rio *r, void *buf, size_t len) { + REDIS_NOTUSED(r); + REDIS_NOTUSED(buf); + REDIS_NOTUSED(len); + return 0; /* Error, this target does not support reading. */ +} + +/* Returns read/write position in file. */ +static off_t rioFdsetTell(rio *r) { + return r->io.fdset.pos; +} + +static const rio rioFdsetIO = { + rioFdsetRead, + rioFdsetWrite, + rioFdsetTell, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithFdset(rio *r, int *fds, int numfds) { + *r = rioFdsetIO; + r->io.fdset.fds = zmalloc(sizeof(int)*numfds); + memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds); + r->io.fdset.numfds = numfds; + r->io.fdset.pos = 0; +} + +void rioFreeFdset(rio *r) { + zfree(r->io.fdset.fds); +} + /* ---------------------------- Generic functions ---------------------------- */ /* This function can be installed both in memory and file streams when checksum diff --git a/src/rio.h b/src/rio.h index 2d12c6cc7..0d485d454 100644 --- a/src/rio.h +++ b/src/rio.h @@ -61,15 +61,23 @@ struct _rio { /* Backend-specific vars. */ union { + /* In-memory buffer target. */ struct { sds ptr; off_t pos; } buffer; + /* Stdio file pointer target. */ struct { FILE *fp; off_t buffered; /* Bytes written since last fsync. */ off_t autosync; /* fsync after 'autosync' bytes written. */ } file; + /* Multiple FDs target (used to write to N sockets). */ + struct { + int *fds; + int numfds; + off_t pos; + } fdset; } io; }; @@ -111,6 +119,7 @@ static inline off_t rioTell(rio *r) { void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); +void rioInitWithFdset(rio *r, int *fds, int numfds); size_t rioWriteBulkCount(rio *r, char prefix, int count); size_t rioWriteBulkString(rio *r, const char *buf, size_t len); -- cgit v1.2.1 From 75f0cd6520c73bc868717940ed583c4809ab30db Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 14 Oct 2014 10:11:26 +0200 Subject: Diskless replication: RDB -> slaves transfer draft implementation. --- src/rdb.c | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- src/rdb.h | 1 + src/redis.c | 1 + src/redis.h | 6 ++- src/replication.c | 81 +++++++++++++++++++++---------- 5 files changed, 200 insertions(+), 28 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index bd6d1e579..4d45c424f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -689,6 +689,32 @@ werr: return REDIS_ERR; } +/* This is just a wrapper to rdbSaveRio() that additionally adds a prefix + * and a suffix to the generated RDB dump. The prefix is: + * + * $EOF:<40 bytes unguessable hex string>\r\n + * + * While the suffix is the 40 bytes hex string we announced in the prefix. + * This way processes receiving the payload can understand when it ends + * without doing any processing of the content. */ +int rdbSaveRioWithEOFMark(rio *rdb, int *error) { + char eofmark[REDIS_EOF_MARK_SIZE]; + + getRandomHexChars(eofmark,REDIS_EOF_MARK_SIZE); + if (error) *error = 0; + if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; + if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr; + if (rioWrite(rdb,"\r\n",2) == 0) goto werr; + if (rdbSaveRio(rdb,error) == REDIS_ERR) goto werr; + if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr; + return REDIS_OK; + +werr: /* Write error. */ + /* Set 'error' only if not already set by rdbSaveRio() call. */ + if (error && *error == 0) *error = errno; + return REDIS_ERR; +} + /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */ int rdbSave(char *filename) { char tmpfile[256]; @@ -1211,8 +1237,9 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ return REDIS_ERR; /* Just to avoid warning */ } -/* A background saving child (BGSAVE) terminated its work. Handle this. */ -void backgroundSaveDoneHandler(int exitcode, int bysignal) { +/* A background saving child (BGSAVE) terminated its work. Handle this. + * This function covers the case of actual BGSAVEs. */ +void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background saving terminated with success"); @@ -1242,7 +1269,113 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { server.rdb_save_time_start = -1; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ - updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR); + updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_DISK); +} + +/* A background saving child (BGSAVE) terminated its work. Handle this. + * This function covers the case of RDB -> Salves socket transfers for + * diskless replication. */ +void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { + if (!bysignal && exitcode == 0) { + redisLog(REDIS_NOTICE, + "Background RDB transfer terminated with success"); + } else if (!bysignal && exitcode != 0) { + redisLog(REDIS_WARNING, "Background transfer error"); + } else { + redisLog(REDIS_WARNING, + "Background transfer terminated by signal %d", bysignal); + } + server.rdb_child_pid = -1; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; + server.rdb_save_time_start = -1; + /* Possibly there are slaves waiting for a BGSAVE in order to be served + * (the first stage of SYNC is a bulk transfer of dump.rdb) */ + updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET); +} + +/* When a background RDB saving/transfer terminates, call the right handler. */ +void backgroundSaveDoneHandler(int exitcode, int bysignal) { + switch(server.rdb_child_type) { + case REDIS_RDB_CHILD_TYPE_DISK: + backgroundSaveDoneHandlerDisk(exitcode,bysignal); + break; + case REDIS_RDB_CHILD_TYPE_SOCKET: + backgroundSaveDoneHandlerSocket(exitcode,bysignal); + break; + default: + redisPanic("Unknown RDB child type."); + break; + } +} + +/* Spawn an RDB child that writes the RDB to the sockets of the slaves + * that are currently in REDIS_REPL_WAIT_BGSAVE_START state. */ +int rdbSaveToSlavesSockets(void) { + int *fds; + int numfds; + listNode *ln; + listIter li; + pid_t childpid; + long long start; + + if (server.rdb_child_pid != -1) return REDIS_ERR; + + fds = zmalloc(sizeof(int)*listLength(server.slaves)); + numfds = 0; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + fds[numfds++] = slave->fd; + slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + } + } + + /* Fork ... */ + start = ustime(); + if ((childpid = fork()) == 0) { + /* Child */ + int retval; + rio slave_sockets; + + rioInitWithFdset(&slave_sockets,fds,numfds); + zfree(fds); + + closeListeningSockets(0); + redisSetProcTitle("redis-rdb-to-slaves"); + + retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL); + if (retval == REDIS_OK) { + size_t private_dirty = zmalloc_get_private_dirty(); + + if (private_dirty) { + redisLog(REDIS_NOTICE, + "RDB: %zu MB of memory used by copy-on-write", + private_dirty/(1024*1024)); + } + } + exitFromChild((retval == REDIS_OK) ? 0 : 1); + } else { + /* Parent */ + server.stat_fork_time = ustime()-start; + server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ + latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); + if (childpid == -1) { + redisLog(REDIS_WARNING,"Can't save in background: fork: %s", + strerror(errno)); + return REDIS_ERR; + } + redisLog(REDIS_NOTICE,"Background RDB transfer started by pid %d",childpid); + server.rdb_save_time_start = time(NULL); + server.rdb_child_pid = childpid; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_SOCKET; + updateDictResizePolicy(); + zfree(fds); + return REDIS_OK; + } + return REDIS_OK; /* unreached */ } void saveCommand(redisClient *c) { diff --git a/src/rdb.h b/src/rdb.h index 54ee4e514..eb40d4993 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -101,6 +101,7 @@ int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); int rdbLoad(char *filename); int rdbSaveBackground(char *filename); +int rdbSaveToSlavesSockets(void); void rdbRemoveTempFile(pid_t childpid); int rdbSave(char *filename); int rdbSaveObject(rio *rdb, robj *o); diff --git a/src/redis.c b/src/redis.c index 3340ecef9..d8ba5675a 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1480,6 +1480,7 @@ void initServerConfig(void) { server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY; + server.repl_diskless = REDIS_DEFAULT_RDB_DISKLESS; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; diff --git a/src/redis.h b/src/redis.h index 5e756f0f0..d8f2b444c 100644 --- a/src/redis.h +++ b/src/redis.h @@ -96,6 +96,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_REPL_TIMEOUT 60 #define REDIS_REPL_PING_SLAVE_PERIOD 10 #define REDIS_RUN_ID_SIZE 40 +#define REDIS_EOF_MARK_SIZE 40 #define REDIS_OPS_SEC_SAMPLES 16 #define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */ #define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */ @@ -113,6 +114,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_DEFAULT_RDB_COMPRESSION 1 #define REDIS_DEFAULT_RDB_CHECKSUM 1 #define REDIS_DEFAULT_RDB_FILENAME "dump.rdb" +#define REDIS_DEFAULT_RDB_DISKLESS 0 +#define REIDS_DEFAULT_RDB_DISKLESS_DELAY 5 #define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define REDIS_DEFAULT_SLAVE_READ_ONLY 1 #define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0 @@ -796,6 +799,7 @@ struct redisServer { int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ + int repl_diskless; /* Send RDB to slaves sockets directly. */ /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ @@ -1138,7 +1142,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc); -void updateSlavesWaitingBgsave(int bgsaveerr); +void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); void replicationHandleMasterDisconnection(void); void replicationCacheMaster(redisClient *c); diff --git a/src/replication.c b/src/replication.c index f4ac58f4d..ba39fddb1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -408,6 +408,28 @@ need_full_resync: return REDIS_ERR; } +/* Start a BGSAVE for replication goals, which is, selecting the disk or + * socket target depending on the configuration, and making sure that + * the script cache is flushed before to start. + * + * Returns REDIS_OK on success or REDIS_ERR otherwise. */ +int startBgsaveForReplication(void) { + int retval; + + redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", + server.repl_diskless ? "slaves sockets" : "disk"); + + if (server.repl_diskless) + retval = rdbSaveToSlavesSockets(); + else + retval = rdbSaveBackground(server.rdb_filename); + + /* Flush the script cache, since we need that slave differences are + * accumulated without requiring slaves to match our cached scripts. */ + if (retval == REDIS_OK) replicationScriptCacheFlush(); + return retval; +} + /* SYNC and PSYNC command implemenation. */ void syncCommand(redisClient *c) { /* ignore SYNC if already slave or in monitor mode */ @@ -465,7 +487,9 @@ void syncCommand(redisClient *c) { /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ - if (server.rdb_child_pid != -1) { + if (server.rdb_child_pid != -1 && + server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK) + { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save. */ @@ -480,12 +504,7 @@ void syncCommand(redisClient *c) { } if (ln) { /* Perfect, the server is already registering differences for - * another slave. Set the right state, and copy the buffer. - * - * Note that if we found a slave in WAIT_BGSAVE_END state, this - * means that the current child is of type - * REDIS_RDB_CHILD_TYPE_DISK, since the first slave in this state - * can only be added when an RDB save with disk target is started. */ + * another slave. Set the right state, and copy the buffer. */ copyClientOutputBuffer(c,slave); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); @@ -495,17 +514,31 @@ void syncCommand(redisClient *c) { c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } + } else if (server.rdb_child_pid != -1 && + server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET) + { + /* There is an RDB child process but it is writing directly to + * children sockets. We need to wait for the next BGSAVE + * in order to synchronize. */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } else { - /* Ok we don't have a BGSAVE in progress, let's start one. */ - redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); - if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { - redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); - addReplyError(c,"Unable to perform background save"); - return; + if (server.repl_diskless) { + /* Diskless replication RDB child is created inside + * replicationCron() since we want to delay its start a + * few seconds to wait for more slaves to arrive. */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); + } else { + /* Ok we don't have a BGSAVE in progress, let's start one. */ + redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); + if (startBgsaveForReplication() != REDIS_OK) { + redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); + addReplyError(c,"Unable to perform background save"); + return; + } + c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } - c->replstate = REDIS_REPL_WAIT_BGSAVE_END; - /* Flush the script cache for the new slave. */ - replicationScriptCacheFlush(); } if (server.repl_disable_tcp_nodelay) @@ -644,10 +677,15 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { /* This function is called at the end of every background saving. * The argument bgsaveerr is REDIS_OK if the background saving succeeded * otherwise REDIS_ERR is passed to the function. + * The 'type' argument is the type of the child that terminated + * (if it had a disk or socket target). * * The goal of this function is to handle slaves waiting for a successful - * background saving in order to perform non-blocking synchronization. */ -void updateSlavesWaitingBgsave(int bgsaveerr) { + * background saving in order to perform non-blocking synchronization, and + * to schedule a new BGSAVE if there are slaves that attached while a + * BGSAVE was in progress, but it was not a good one for replication (no + * other slave was accumulating differences). */ +void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; int startbgsave = 0; listIter li; @@ -687,12 +725,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr) { } } if (startbgsave) { - /* Since we are starting a new background save for one or more slaves, - * we flush the Replication Script Cache to use EVAL to propagate every - * new EVALSHA for the first time, since all the new slaves don't know - * about previous scripts. */ - replicationScriptCacheFlush(); - if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { + if (startBgsaveForReplication() != REDIS_OK) { listIter li; listRewind(server.slaves,&li); -- cgit v1.2.1 From 1cd0d26c63c755aaac4997e852c4041a0237a395 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 14 Oct 2014 15:29:07 +0200 Subject: Diskless replication: parent-child pipe and a few TODOs. --- src/rdb.c | 37 +++++++++++++++++++++++++++++++++---- src/redis.h | 2 ++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 4d45c424f..8c50610e4 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1274,7 +1274,13 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { /* A background saving child (BGSAVE) terminated its work. Handle this. * This function covers the case of RDB -> Salves socket transfers for - * diskless replication. */ + * diskless replication. + * + * TODO: + * 1) Read from the pipe the set of IDs which are fine. This should be + * just an uint32_t with the len, and N pid_t elements. + * 2) Close all the slaves in state REDIS_REPL_WAIT_BGSAVE_END but are + * not reported as "transfer ok" by the child. */ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, @@ -1288,8 +1294,18 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { server.rdb_child_pid = -1; server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; server.rdb_save_time_start = -1; - /* Possibly there are slaves waiting for a BGSAVE in order to be served - * (the first stage of SYNC is a bulk transfer of dump.rdb) */ + /* Read the set of slave client IDs that currently received the full + * RDB payload, closing all the slaves which are not among the ones + * listed. */ + + /* TODO ... */ + + /* Close the pipe FDs. */ + + /* TODO ... */ + + /* We can continue the replication process with all the slaves that + * correctly received the full payload. */ updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET); } @@ -1317,9 +1333,19 @@ int rdbSaveToSlavesSockets(void) { listIter li; pid_t childpid; long long start; + int pipefds[2]; if (server.rdb_child_pid != -1) return REDIS_ERR; + /* Before to fork, create a pipe that will be used in order to + * send back to the parent the IDs of the slaves that successfully + * received all the writes. */ + if (pipe(pipefds) == -1) return REDIS_ERR; + server.rdb_pipe_read_result_from_child = pipefds[0]; + server.rdb_pipe_write_result_to_parent = pipefds[1]; + + /* Collect the file descriptors of the slaves we want to transfer + * the RDB to, which are i WAIT_BGSAVE_START state. */ fds = zmalloc(sizeof(int)*listLength(server.slaves)); numfds = 0; @@ -1333,7 +1359,7 @@ int rdbSaveToSlavesSockets(void) { } } - /* Fork ... */ + /* Create the child process. */ start = ustime(); if ((childpid = fork()) == 0) { /* Child */ @@ -1365,6 +1391,9 @@ int rdbSaveToSlavesSockets(void) { if (childpid == -1) { redisLog(REDIS_WARNING,"Can't save in background: fork: %s", strerror(errno)); + zfree(fds); + close(pipefds[0]); + close(pipefds[1]); return REDIS_ERR; } redisLog(REDIS_NOTICE,"Background RDB transfer started by pid %d",childpid); diff --git a/src/redis.h b/src/redis.h index d8f2b444c..e55814362 100644 --- a/src/redis.h +++ b/src/redis.h @@ -775,6 +775,8 @@ struct redisServer { int rdb_child_type; /* Type of save by active child. */ int lastbgsave_status; /* REDIS_OK or REDIS_ERR */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ + int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ + int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ /* Propagation of commands in AOF / replication */ redisOpArray also_propagate; /* Additional command to propagate. */ /* Logging */ -- cgit v1.2.1 From 2a436aaeabcc9cd5ad625613e7882df9495ab04b Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 14 Oct 2014 17:19:34 +0200 Subject: rio.c fdset target: tolerate (and report) a subset of FDs in error. Fdset target is used when we want to write an RDB file directly to slave's sockets. In this setup as long as there is a single slave that is still receiving our payload, we want to continue sennding instead of aborting. However rio calls should abort of no FD is ok. Also we want the errors reported so that we can signal the parent who is ok and who is broken, so there is a new set integers with the state of each fd. Zero is ok, non-zero is the errno of the failure, if avaialble, or a generic EIO. --- src/rio.c | 22 ++++++++++++++++++++-- src/rio.h | 3 ++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/rio.c b/src/rio.c index 88f6781e0..dbda4c668 100644 --- a/src/rio.c +++ b/src/rio.c @@ -144,7 +144,9 @@ void rioInitWithFile(rio *r, FILE *fp) { /* ------------------- File descriptors set implementation ------------------- */ -/* Returns 1 or 0 for success/failure. */ +/* Returns 1 or 0 for success/failure. + * The function returns success as long as we are able to correctly write + * to at least one file descriptor. */ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { size_t retval; int j; @@ -155,10 +157,21 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { * the TCP socket. */ while(len) { size_t count = len < 1024 ? len : 1024; + int broken = 0; for (j = 0; j < r->io.fdset.numfds; j++) { + if (r->io.fdset.state[j] != 0) { + /* Skip FDs alraedy in error. */ + broken++; + continue; + } retval = write(r->io.fdset.fds[j],p,count); - if (retval != count) return 0; + if (retval != count) { + /* Mark this FD as broken. */ + r->io.fdset.state[j] = errno; + if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO; + } } + if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */ p += count; len -= count; r->io.fdset.pos += count; @@ -191,15 +204,20 @@ static const rio rioFdsetIO = { }; void rioInitWithFdset(rio *r, int *fds, int numfds) { + int j; + *r = rioFdsetIO; r->io.fdset.fds = zmalloc(sizeof(int)*numfds); + r->io.fdset.state = zmalloc(sizeof(int)*numfds); memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds); + for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0; r->io.fdset.numfds = numfds; r->io.fdset.pos = 0; } void rioFreeFdset(rio *r) { zfree(r->io.fdset.fds); + zfree(r->io.fdset.state); } /* ---------------------------- Generic functions ---------------------------- */ diff --git a/src/rio.h b/src/rio.h index 0d485d454..b73f4a050 100644 --- a/src/rio.h +++ b/src/rio.h @@ -74,7 +74,8 @@ struct _rio { } file; /* Multiple FDs target (used to write to N sockets). */ struct { - int *fds; + int *fds; /* File descriptors. */ + int *state; /* Error state of each fd. 0 (if ok) or errno. */ int numfds; off_t pos; } fdset; -- cgit v1.2.1 From fbe7545545269ab47f98be70ca41435a30043fe2 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 15 Oct 2014 09:46:45 +0200 Subject: Diskless replication: child writes report to parent. --- src/rdb.c | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/rdb.c b/src/rdb.c index 8c50610e4..f2eae0234 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1328,6 +1328,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { * that are currently in REDIS_REPL_WAIT_BGSAVE_START state. */ int rdbSaveToSlavesSockets(void) { int *fds; + uint64_t *clientids; int numfds; listNode *ln; listIter li; @@ -1347,6 +1348,10 @@ int rdbSaveToSlavesSockets(void) { /* Collect the file descriptors of the slaves we want to transfer * the RDB to, which are i WAIT_BGSAVE_START state. */ fds = zmalloc(sizeof(int)*listLength(server.slaves)); + /* We also allocate an array of corresponding client IDs. This will + * be useful for the child process in order to build the report + * (sent via unix pipe) that will be sent to the parent. */ + clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves)); numfds = 0; listRewind(server.slaves,&li); @@ -1354,6 +1359,7 @@ int rdbSaveToSlavesSockets(void) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + clientids[numfds] = slave->id; fds[numfds++] = slave->fd; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; } @@ -1381,10 +1387,44 @@ int rdbSaveToSlavesSockets(void) { "RDB: %zu MB of memory used by copy-on-write", private_dirty/(1024*1024)); } + + /* If we are returning OK, at least one slave was served + * with the RDB file as expected, so we need to send a report + * to the parent via the pipe. The format of the message is: + * just an array of uint64_t integers (to avoid alignment concerns), + * where the first element is the number of uint64_t elements + * that follows, representing slave client IDs that were + * successfully served. */ + void *msg = zmalloc(sizeof(uint64_t)*(1+numfds)); + uint64_t *len = msg; + uint64_t *ids = len+1; + int j, msglen; + + *len = 0; + for (j = 0; j < numfds; j++) { + /* No error? Add it. */ + if (slave_sockets.io.fdset.state[j] == 0) { + ids[*len] = clientids[j]; + (*len)++; + } + } + + /* Write the message to the parent. If we have no good slaves or + * we are unable to transfer the message to the parent, we exit + * with an error so that the parent will abort the replication + * process with all the childre that were waiting. */ + msglen = sizeof(uint64_t)*(1+(*len)); + if (*len == 0 || + write(server.rdb_pipe_write_result_to_parent,msg,msglen) + != msglen) + { + retval = REDIS_ERR; + } } exitFromChild((retval == REDIS_OK) ? 0 : 1); } else { /* Parent */ + zfree(clientids); /* Not used by parent. Free ASAP. */ server.stat_fork_time = ustime()-start; server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); -- cgit v1.2.1 From 7a1e0d9898a8b2a6c36ef12b1d1f9f0b6dece4e0 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 15 Oct 2014 11:35:00 +0200 Subject: Diskless replication: read report from child. --- src/rdb.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 15 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index f2eae0234..bcabd0aed 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1274,14 +1274,10 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { /* A background saving child (BGSAVE) terminated its work. Handle this. * This function covers the case of RDB -> Salves socket transfers for - * diskless replication. - * - * TODO: - * 1) Read from the pipe the set of IDs which are fine. This should be - * just an uint32_t with the len, and N pid_t elements. - * 2) Close all the slaves in state REDIS_REPL_WAIT_BGSAVE_END but are - * not reported as "transfer ok" by the child. */ + * diskless replication. */ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { + uint64_t *ok_slaves; + if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background RDB transfer terminated with success"); @@ -1294,18 +1290,64 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { server.rdb_child_pid = -1; server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; server.rdb_save_time_start = -1; - /* Read the set of slave client IDs that currently received the full - * RDB payload, closing all the slaves which are not among the ones - * listed. */ - - /* TODO ... */ - /* Close the pipe FDs. */ + /* If the child returns an OK exit code, read the set of slave client + * IDs that received the full RDB payload, closing all the slaves + * which are not among the ones listed. + * + * If the process returned an error, consider the list of slaves that + * can continue to be emtpy, so that it's just a speical case of the + * normal code path. */ + ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */ + ok_slaves = 0; + if (!bysignal && exitcode == 0) { + int readlen = sizeof(uint64_t); + + if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) != + readlen) + { + readlen = ok_slaves[0]*sizeof(uint64_t); + + /* Make space for enough elements as specified by the first + * uint64_t element in the array. */ + ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen); + if (readlen && + read(server.rdb_pipe_read_result_from_child, ok_slaves+1, + readlen) != readlen) + { + ok_slaves[0] = 0; + } + } + } - /* TODO ... */ + close(server.rdb_pipe_read_result_from_child); + close(server.rdb_pipe_write_result_to_parent); /* We can continue the replication process with all the slaves that - * correctly received the full payload. */ + * correctly received the full payload. Others are terminated. */ + listNode *ln; + listIter li; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { + uint64_t j; + + for (j = 1; j < ok_slaves[0]; j++) { + if (slave->id == ok_slaves[j]) break; /* Found in the OK list. */ + } + if (j == ok_slaves[0]) { + redisLog(REDIS_WARNING, + "Closing slave %llu: child->slave RDB transfer failed.", + slave->id); + freeClient(slave); + } + } + } + zfree(ok_slaves); + updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET); } -- cgit v1.2.1 From 3730d118a3ee0ddfc640ba1c8cf77a33dff66a35 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 15 Oct 2014 15:31:19 +0200 Subject: Diskless replication: handle putting the slave online. --- src/replication.c | 88 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/src/replication.c b/src/replication.c index ba39fddb1..02d4d50b1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -611,6 +611,29 @@ void replconfCommand(redisClient *c) { addReply(c,shared.ok); } +/* This function puts a slave in the online state, and should be called just + * after a slave received the RDB file for the initial synchronization, and + * we are finally ready to send the incremental stream of commands. + * + * It does a few things: + * + * 1) Put the slave in ONLINE state. + * 2) Make sure the writable event is re-installed, since calling the SYNC + * command disables it, so that we can accumulate output buffer without + * sending it to the slave. + * 3) Update the count of good slaves. */ +void putSlaveOnline(redisClient *slave) { + slave->replstate = REDIS_REPL_ONLINE; + slave->repl_ack_time = server.unixtime; + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, + sendReplyToClient, slave) == AE_ERR) { + redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); + freeClient(slave); + return; + } + refreshGoodSlavesCount(); +} + void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *slave = privdata; REDIS_NOTUSED(el); @@ -661,16 +684,8 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { close(slave->repldbfd); slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); - slave->replstate = REDIS_REPL_ONLINE; - slave->repl_ack_time = server.unixtime; - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, - sendReplyToClient, slave) == AE_ERR) { - redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); - freeClient(slave); - return; - } - refreshGoodSlavesCount(); - redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); + putSlaveOnline(slave); + redisLog(REDIS_NOTICE,"Synchronization with slave succeeded (disk)"); } } @@ -700,27 +715,38 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { struct redis_stat buf; - if (bgsaveerr != REDIS_OK) { - freeClient(slave); - redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); - continue; - } - if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || - redis_fstat(slave->repldbfd,&buf) == -1) { - freeClient(slave); - redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); - continue; - } - slave->repldboff = 0; - slave->repldbsize = buf.st_size; - slave->replstate = REDIS_REPL_SEND_BULK; - slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", - (unsigned long long) slave->repldbsize); - - aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { - freeClient(slave); - continue; + /* If this was an RDB on disk save, we have to prepare to send + * the RDB from disk to the slave socket. Otherwise if this was + * already an RDB -> Slaves socket transfer, used in the case of + * diskless replication, our work is trivial, we can just put + * the slave online. */ + if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { + putSlaveOnline(slave); + redisLog(REDIS_NOTICE, + "Synchronization with slave succeeded (socket)"); + } else { + if (bgsaveerr != REDIS_OK) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); + continue; + } + if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || + redis_fstat(slave->repldbfd,&buf) == -1) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + continue; + } + slave->repldboff = 0; + slave->repldbsize = buf.st_size; + slave->replstate = REDIS_REPL_SEND_BULK; + slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", + (unsigned long long) slave->repldbsize); + + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { + freeClient(slave); + continue; + } } } } -- cgit v1.2.1 From e9e007555e03af906544c0870faaaba1e1d2d0a2 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 16 Oct 2014 09:03:52 +0200 Subject: Diskless replication: trigger diskless RDB transfer if needed. --- src/redis.h | 2 +- src/replication.c | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/redis.h b/src/redis.h index e55814362..fd491b453 100644 --- a/src/redis.h +++ b/src/redis.h @@ -115,7 +115,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_DEFAULT_RDB_CHECKSUM 1 #define REDIS_DEFAULT_RDB_FILENAME "dump.rdb" #define REDIS_DEFAULT_RDB_DISKLESS 0 -#define REIDS_DEFAULT_RDB_DISKLESS_DELAY 5 +#define REDIS_DEFAULT_RDB_DISKLESS_DELAY 5 #define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define REDIS_DEFAULT_SLAVE_READ_ONLY 1 #define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0 diff --git a/src/replication.c b/src/replication.c index 02d4d50b1..11ed3a912 100644 --- a/src/replication.c +++ b/src/replication.c @@ -212,7 +212,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } /* Write the command to every slave. */ - listRewind(slaves,&li); + listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; @@ -1941,6 +1941,41 @@ void replicationCron(void) { replicationScriptCacheFlush(); } + /* If we are using diskless replication and there are slaves waiting + * in WAIT_BGSAVE_START state, check if enough seconds elapsed and + * start one. */ + if (server.repl_diskless && server.rdb_child_pid == -1 && + server.aof_child_pid == -1) + { + time_t idle, max_idle = 0; + int slaves_waiting = 0; + listNode *ln; + listIter li; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + idle = server.unixtime - slave->lastinteraction; + if (idle > max_idle) max_idle = idle; + slaves_waiting++; + } + } + + if (slaves_waiting && max_idle > REDIS_DEFAULT_RDB_DISKLESS_DELAY) { + /* Let's start a BGSAVE with disk target. */ + if (startBgsaveForReplication() == REDIS_OK) { + /* It started! We need to change the state of slaves + * from WAIT_BGSAVE_START to WAIT_BGSAVE_END. */ + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) + slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + } + } + } + } + /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); } -- cgit v1.2.1 From 5f8360eb21e798f468162bed76c839a8ed28f480 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 16 Oct 2014 10:00:46 +0200 Subject: Diskless replication flag renamed repl_diskless -> repl_diskless_sync. --- src/redis.c | 2 +- src/redis.h | 6 +++--- src/replication.c | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/redis.c b/src/redis.c index d8ba5675a..7aa12fc60 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1480,7 +1480,7 @@ void initServerConfig(void) { server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY; - server.repl_diskless = REDIS_DEFAULT_RDB_DISKLESS; + server.repl_diskless_sync = REDIS_DEFAULT_RDB_DISKLESS_SYNC; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; diff --git a/src/redis.h b/src/redis.h index fd491b453..b5fdf9be1 100644 --- a/src/redis.h +++ b/src/redis.h @@ -114,8 +114,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_DEFAULT_RDB_COMPRESSION 1 #define REDIS_DEFAULT_RDB_CHECKSUM 1 #define REDIS_DEFAULT_RDB_FILENAME "dump.rdb" -#define REDIS_DEFAULT_RDB_DISKLESS 0 -#define REDIS_DEFAULT_RDB_DISKLESS_DELAY 5 +#define REDIS_DEFAULT_RDB_DISKLESS_SYNC 0 +#define REDIS_DEFAULT_RDB_DISKLESS_SYNC_DELAY 5 #define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define REDIS_DEFAULT_SLAVE_READ_ONLY 1 #define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0 @@ -801,7 +801,7 @@ struct redisServer { int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ - int repl_diskless; /* Send RDB to slaves sockets directly. */ + int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ diff --git a/src/replication.c b/src/replication.c index 11ed3a912..004b7784c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -417,9 +417,9 @@ int startBgsaveForReplication(void) { int retval; redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", - server.repl_diskless ? "slaves sockets" : "disk"); + server.repl_diskless_sync ? "slaves sockets" : "disk"); - if (server.repl_diskless) + if (server.repl_diskless_sync) retval = rdbSaveToSlavesSockets(); else retval = rdbSaveBackground(server.rdb_filename); @@ -523,7 +523,7 @@ void syncCommand(redisClient *c) { c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } else { - if (server.repl_diskless) { + if (server.repl_diskless_sync) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ @@ -1944,7 +1944,7 @@ void replicationCron(void) { /* If we are using diskless replication and there are slaves waiting * in WAIT_BGSAVE_START state, check if enough seconds elapsed and * start one. */ - if (server.repl_diskless && server.rdb_child_pid == -1 && + if (server.repl_diskless_sync && server.rdb_child_pid == -1 && server.aof_child_pid == -1) { time_t idle, max_idle = 0; @@ -1962,7 +1962,7 @@ void replicationCron(void) { } } - if (slaves_waiting && max_idle > REDIS_DEFAULT_RDB_DISKLESS_DELAY) { + if (slaves_waiting && max_idle > REDIS_DEFAULT_RDB_DISKLESS_SYNC_DELAY) { /* Let's start a BGSAVE with disk target. */ if (startBgsaveForReplication() == REDIS_OK) { /* It started! We need to change the state of slaves -- cgit v1.2.1 From 42951ab301bdaedaa2d1eac1096f36a95ab79c12 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 16 Oct 2014 10:15:18 +0200 Subject: Diskless replication: trigger a BGSAVE after a config change. If we turn from diskless to disk-based replication via CONFIG SET, we need a way to start a BGSAVE if there are slaves alerady waiting for a BGSAVE to start. Normally with disk-based replication we do it as soon as the previous child exits, but when there is a configuration change via CONFIG SET, we may have slaves in WAIT_BGSAVE_START state without an RDB background process currently active. --- src/replication.c | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/replication.c b/src/replication.c index 004b7784c..f27174653 100644 --- a/src/replication.c +++ b/src/replication.c @@ -689,17 +689,20 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { } } -/* This function is called at the end of every background saving. - * The argument bgsaveerr is REDIS_OK if the background saving succeeded - * otherwise REDIS_ERR is passed to the function. - * The 'type' argument is the type of the child that terminated - * (if it had a disk or socket target). +/* This function is called at the end of every background saving, + * or when the replication RDB transfer strategy is modified from + * disk to socket or the other way around. * * The goal of this function is to handle slaves waiting for a successful * background saving in order to perform non-blocking synchronization, and * to schedule a new BGSAVE if there are slaves that attached while a * BGSAVE was in progress, but it was not a good one for replication (no - * other slave was accumulating differences). */ + * other slave was accumulating differences). + * + * The argument bgsaveerr is REDIS_OK if the background saving succeeded + * otherwise REDIS_ERR is passed to the function. + * The 'type' argument is the type of the child that terminated + * (if it had a disk or socket target). */ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; int startbgsave = 0; @@ -722,8 +725,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { * the slave online. */ if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { putSlaveOnline(slave); - redisLog(REDIS_NOTICE, - "Synchronization with slave succeeded (socket)"); + redisLog(REDIS_NOTICE, + "Synchronization with slave succeeded (socket)"); } else { if (bgsaveerr != REDIS_OK) { freeClient(slave); @@ -1943,10 +1946,12 @@ void replicationCron(void) { /* If we are using diskless replication and there are slaves waiting * in WAIT_BGSAVE_START state, check if enough seconds elapsed and - * start one. */ - if (server.repl_diskless_sync && server.rdb_child_pid == -1 && - server.aof_child_pid == -1) - { + * start a BGSAVE. + * + * This code is also useful to trigger a BGSAVE if the diskless + * replication was turned off with CONFIG SET, while there were already + * slaves in WAIT_BGSAVE_START state. */ + if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { time_t idle, max_idle = 0; int slaves_waiting = 0; listNode *ln; -- cgit v1.2.1 From 43ae6064302ecba5380c149334f78033137aa850 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 16 Oct 2014 10:22:02 +0200 Subject: Diskless replication: redis.conf and CONFIG SET/GET support. --- src/config.c | 14 +++++++++++++- src/redis.c | 2 +- src/redis.h | 4 ++-- src/replication.c | 3 ++- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/config.c b/src/config.c index 43507000f..1b7b16f67 100644 --- a/src/config.c +++ b/src/config.c @@ -270,6 +270,10 @@ void loadServerConfigFromString(char *config) { if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"repl-diskless-sync") && argc==2) { + if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) { long long size = memtoll(argv[1],NULL); if (size <= 0) { @@ -284,7 +288,7 @@ void loadServerConfigFromString(char *config) { goto loaderr; } } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) { - server.masterauth = zstrdup(argv[1]); + server.masterauth = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"slave-serve-stale-data") && argc == 2) { if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -911,6 +915,11 @@ void configSetCommand(redisClient *c) { if (yn == -1) goto badfmt; server.repl_disable_tcp_nodelay = yn; + } else if (!strcasecmp(c->argv[2]->ptr,"repl-diskless-sync")) { + int yn = yesnotoi(o->ptr); + + if (yn == -1) goto badfmt; + server.repl_diskless_sync = yn; } else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; @@ -1067,6 +1076,8 @@ void configGetCommand(redisClient *c) { config_get_bool_field("activerehashing", server.activerehashing); config_get_bool_field("repl-disable-tcp-nodelay", server.repl_disable_tcp_nodelay); + config_get_bool_field("repl-diskless-sync", + server.repl_diskless_sync); config_get_bool_field("aof-rewrite-incremental-fsync", server.aof_rewrite_incremental_fsync); config_get_bool_field("aof-load-truncated", @@ -1792,6 +1803,7 @@ int rewriteConfig(char *path) { rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,REDIS_DEFAULT_REPL_BACKLOG_SIZE); rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT); rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY); + rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,REDIS_DEFAULT_REPL_DISKLESS_SYNC); rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,REDIS_DEFAULT_SLAVE_PRIORITY); rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,REDIS_DEFAULT_MIN_SLAVES_TO_WRITE); rewriteConfigNumericalOption(state,"min-slaves-max-lag",server.repl_min_slaves_max_lag,REDIS_DEFAULT_MIN_SLAVES_MAX_LAG); diff --git a/src/redis.c b/src/redis.c index 7aa12fc60..61a103aaf 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1480,7 +1480,7 @@ void initServerConfig(void) { server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY; - server.repl_diskless_sync = REDIS_DEFAULT_RDB_DISKLESS_SYNC; + server.repl_diskless_sync = REDIS_DEFAULT_REPL_DISKLESS_SYNC; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; diff --git a/src/redis.h b/src/redis.h index b5fdf9be1..8583beae6 100644 --- a/src/redis.h +++ b/src/redis.h @@ -114,8 +114,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_DEFAULT_RDB_COMPRESSION 1 #define REDIS_DEFAULT_RDB_CHECKSUM 1 #define REDIS_DEFAULT_RDB_FILENAME "dump.rdb" -#define REDIS_DEFAULT_RDB_DISKLESS_SYNC 0 -#define REDIS_DEFAULT_RDB_DISKLESS_SYNC_DELAY 5 +#define REDIS_DEFAULT_REPL_DISKLESS_SYNC 0 +#define REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 #define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define REDIS_DEFAULT_SLAVE_READ_ONLY 1 #define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0 diff --git a/src/replication.c b/src/replication.c index f27174653..927b99174 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1967,7 +1967,8 @@ void replicationCron(void) { } } - if (slaves_waiting && max_idle > REDIS_DEFAULT_RDB_DISKLESS_SYNC_DELAY) { + if (slaves_waiting && max_idle > REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY) + { /* Let's start a BGSAVE with disk target. */ if (startBgsaveForReplication() == REDIS_OK) { /* It started! We need to change the state of slaves -- cgit v1.2.1 From 5ee2ccf48e75012b2cabefd89f40bd09a1f10258 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 16 Oct 2014 17:09:29 +0200 Subject: Diskless replication: EOF: streaming support slave side. --- src/replication.c | 67 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/src/replication.c b/src/replication.c index 927b99174..28ecd0bd4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -818,6 +818,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { REDIS_NOTUSED(privdata); REDIS_NOTUSED(mask); + /* Static vars used to hold the EOF mark, and the last bytes received + * form the server: when they match, we reached the end of the transfer. */ + static char eofmark[REDIS_RUN_ID_SIZE]; + static char lastbytes[REDIS_RUN_ID_SIZE]; + static int usemark = 0; + /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ if (server.repl_transfer_size == -1) { @@ -843,16 +849,41 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; } - server.repl_transfer_size = strtol(buf+1,NULL,10); - redisLog(REDIS_NOTICE, - "MASTER <-> SLAVE sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + + /* There are two possible forms for the bulk payload. One is the + * usual $ bulk format. The other is used for diskless transfers + * when the master does not know beforehand the size of the file to + * transfer. In the latter case, the following format is used: + * + * $EOF:<40 bytes delimiter> + * + * At the end of the file the announced delimiter is transmitted. The + * delimiter is long and random enough that the probability of a + * collision with the actual file content can be ignored. */ + if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) { + usemark = 1; + memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE); + memset(lastbytes,0,REDIS_RUN_ID_SIZE); + redisLog(REDIS_NOTICE, + "MASTER <-> SLAVE sync: receiving streamed RDB from master"); + } else { + usemark = 0; + server.repl_transfer_size = strtol(buf+1,NULL,10); + redisLog(REDIS_NOTICE, + "MASTER <-> SLAVE sync: receiving %lld bytes from master", + (long long) server.repl_transfer_size); + } return; } /* Read bulk data */ - left = server.repl_transfer_size - server.repl_transfer_read; - readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); + if (usemark) { + left = server.repl_transfer_size - server.repl_transfer_read; + readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); + } else { + readlen = sizeof(buf); + } + nread = read(fd,buf,readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", @@ -860,6 +891,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { replicationAbortSyncTransfer(); return; } + + /* When a mark is used, we want to detect EOF asap in order to avoid + * writing the EOF mark into the file... */ + int eof_reached = 0; + + if (usemark) { + /* Update the last bytes array, and check if it matches our delimiter. */ + if (nread >= REDIS_RUN_ID_SIZE) { + memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE); + } else { + int rem = REDIS_RUN_ID_SIZE-nread; + memmove(lastbytes,lastbytes+nread,rem); + memcpy(lastbytes+rem,buf,nread); + } + if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1; + } + server.repl_transfer_lastio = server.unixtime; if (write(server.repl_transfer_fd,buf,nread) != nread) { redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); @@ -881,7 +929,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Check if the transfer is now complete */ - if (server.repl_transfer_read == server.repl_transfer_size) { + if (!usemark) { + if (server.repl_transfer_read == server.repl_transfer_size) + eof_reached = 1; + } + + if (eof_reached) { if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); replicationAbortSyncTransfer(); -- cgit v1.2.1 From f70369335d85b9fed0c4192b79e6a6bb5477ab7b Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 16 Oct 2014 17:35:11 +0200 Subject: syncReadLine(): actually enforce buffer size limits. --- src/syncio.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/syncio.c b/src/syncio.c index 8810a842c..ac2a4a373 100644 --- a/src/syncio.c +++ b/src/syncio.c @@ -139,6 +139,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) { *ptr = '\0'; nread++; } + size--; } return nread; } -- cgit v1.2.1 From 80f7f63b64937df4d7f0bb53450ba281a5dd26a4 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 10:21:18 +0200 Subject: Diskless replication: don't enter the read-payload branch forever. --- src/replication.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/replication.c b/src/replication.c index 28ecd0bd4..35a38e2e3 100644 --- a/src/replication.c +++ b/src/replication.c @@ -864,6 +864,9 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { usemark = 1; memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE); memset(lastbytes,0,REDIS_RUN_ID_SIZE); + /* Set any repl_transfer_size to avoid entering this code path + * at the next call. */ + server.repl_transfer_size = 0; redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: receiving streamed RDB from master"); } else { -- cgit v1.2.1 From 0c5a06f6bb8aaa6d15ea1a363ed19b637934579a Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 10:22:29 +0200 Subject: Diskless replication: swap inverted branches to compute read len. --- src/replication.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index 35a38e2e3..050abfd03 100644 --- a/src/replication.c +++ b/src/replication.c @@ -881,10 +881,10 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { /* Read bulk data */ if (usemark) { + readlen = sizeof(buf); + } else { left = server.repl_transfer_size - server.repl_transfer_read; readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); - } else { - readlen = sizeof(buf); } nread = read(fd,buf,readlen); -- cgit v1.2.1 From 25a3d9965e0a130705d974ae6a8a6a51e7fabe95 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 10:23:11 +0200 Subject: Diskless replication: remove 40 bytes EOF mark from end of RDB file. --- src/replication.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/replication.c b/src/replication.c index 050abfd03..7dd4a4436 100644 --- a/src/replication.c +++ b/src/replication.c @@ -918,6 +918,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } server.repl_transfer_read += nread; + /* Delete the last 40 bytes from the file if we reached EOF. */ + if (usemark && eof_reached) { + if (ftruncate(server.repl_transfer_fd, + server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1) + { + redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); + goto error; + } + } + /* Sync data on disk from time to time, otherwise at the end of the transfer * we may suffer a big delay as the memory buffers are copied into the * actual disk. */ -- cgit v1.2.1 From 4b16263bd9e7b647ecc3d864481eff3de913a229 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 10:23:44 +0200 Subject: Diskless replication: don't send "\n" pings to slaves. This is useful for normal replication in order to refresh the slave when we are persisting on disk, but for diskless replication the child is already receiving data while in WAIT_BGSAVE_END state. --- src/replication.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 7dd4a4436..fa5ed87d7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1950,7 +1950,9 @@ void replicationCron(void) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START || - slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { + (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END && + server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET)) + { if (write(slave->fd, "\n", 1) == -1) { /* Don't worry, it's just a ping. */ } -- cgit v1.2.1 From b1337b15b6b090315d884d7372eea344926ae95b Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 10:43:56 +0200 Subject: Diskless replication: Various fixes to backgroundSaveDoneHandlerSocket() --- src/rdb.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index bcabd0aed..45beae14d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1299,11 +1299,11 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { * can continue to be emtpy, so that it's just a speical case of the * normal code path. */ ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */ - ok_slaves = 0; + ok_slaves[0] = 0; if (!bysignal && exitcode == 0) { int readlen = sizeof(uint64_t); - if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) != + if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) == readlen) { readlen = ok_slaves[0]*sizeof(uint64_t); @@ -1335,14 +1335,18 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { uint64_t j; - for (j = 1; j < ok_slaves[0]; j++) { - if (slave->id == ok_slaves[j]) break; /* Found in the OK list. */ + for (j = 0; j < ok_slaves[0]; j++) { + if (slave->id == ok_slaves[j+1]) break; /* Found in OK list. */ } if (j == ok_slaves[0]) { redisLog(REDIS_WARNING, "Closing slave %llu: child->slave RDB transfer failed.", slave->id); freeClient(slave); + } else { + redisLog(REDIS_WARNING, + "Slave %llu correctly received the streamed RDB file.", + slave->id); } } } -- cgit v1.2.1 From 10aafdad56fa79bd7f95d9b190054b2e56b6cddd Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 11:36:12 +0200 Subject: Diskless replication: rio fdset target new supports buffering. To perform a socket write() for each RDB rio API write call was extremely unefficient, so now rio has minimal buffering capabilities. Writes are accumulated into a buffer and only when a given limit is reacehd are actually wrote to the N slaves FDs. Trivia: rio lacked support for buffering since our targets were: 1) Memory buffers. 2) C standard I/O. Both were buffered already. --- src/rdb.c | 3 +++ src/rio.c | 48 +++++++++++++++++++++++++++++++++++++++++++++++- src/rio.h | 6 ++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/rdb.c b/src/rdb.c index 45beae14d..c6a1ec691 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1425,6 +1425,9 @@ int rdbSaveToSlavesSockets(void) { redisSetProcTitle("redis-rdb-to-slaves"); retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL); + if (retval == REDIS_OK && rioFlush(&slave_sockets) == 0) + retval = REDIS_ERR; + if (retval == REDIS_OK) { size_t private_dirty = zmalloc_get_private_dirty(); diff --git a/src/rio.c b/src/rio.c index dbda4c668..5153ed28e 100644 --- a/src/rio.c +++ b/src/rio.c @@ -78,10 +78,18 @@ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioBufferFlush(rio *r) { + REDIS_NOTUSED(r); + return 1; /* Nothing to do, our write just appends to the buffer. */ +} + static const rio rioBufferIO = { rioBufferRead, rioBufferWrite, rioBufferTell, + rioBufferFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -124,10 +132,17 @@ static off_t rioFileTell(rio *r) { return ftello(r->io.file.fp); } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFileFlush(rio *r) { + return (fflush(r->io.file.fp) == 0) ? 1 : 0; +} + static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, + rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -146,11 +161,29 @@ void rioInitWithFile(rio *r, FILE *fp) { /* Returns 1 or 0 for success/failure. * The function returns success as long as we are able to correctly write - * to at least one file descriptor. */ + * to at least one file descriptor. + * + * When buf is NULL adn len is 0, the function performs a flush operation + * if there is some pending buffer, so this function is also used in order + * to implement rioFdsetFlush(). */ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { size_t retval; int j; unsigned char *p = (unsigned char*) buf; + int doflush = (buf == NULL && len == 0); + + /* To start we always append to our buffer. If it gets larger than + * a given size, we actually write to the sockets. */ + if (len) { + r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len); + len = 0; /* Prevent entering the while belove if we don't flush. */ + if (sdslen(r->io.fdset.buf) > REDIS_IOBUF_LEN) doflush = 1; + } + + if (doflush) { + p = (unsigned char*) r->io.fdset.buf; + len = sdslen(r->io.fdset.buf); + } /* Write in little chunchs so that when there are big writes we * parallelize while the kernel is sending data in background to @@ -176,6 +209,8 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { len -= count; r->io.fdset.pos += count; } + + if (doflush) sdsclear(r->io.fdset.buf); return 1; } @@ -192,10 +227,19 @@ static off_t rioFdsetTell(rio *r) { return r->io.fdset.pos; } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdsetFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdsetWrite(r,NULL,0); +} + static const rio rioFdsetIO = { rioFdsetRead, rioFdsetWrite, rioFdsetTell, + rioFdsetFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -213,11 +257,13 @@ void rioInitWithFdset(rio *r, int *fds, int numfds) { for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0; r->io.fdset.numfds = numfds; r->io.fdset.pos = 0; + r->io.fdset.buf = sdsempty(); } void rioFreeFdset(rio *r) { zfree(r->io.fdset.fds); zfree(r->io.fdset.state); + sdsfree(r->io.fdset.buf); } /* ---------------------------- Generic functions ---------------------------- */ diff --git a/src/rio.h b/src/rio.h index b73f4a050..e5fa0cd33 100644 --- a/src/rio.h +++ b/src/rio.h @@ -43,6 +43,7 @@ struct _rio { size_t (*read)(struct _rio *, void *buf, size_t len); size_t (*write)(struct _rio *, const void *buf, size_t len); off_t (*tell)(struct _rio *); + int (*flush)(struct _rio *); /* The update_cksum method if not NULL is used to compute the checksum of * all the data that was read or written so far. The method should be * designed so that can be called with the current checksum, and the buf @@ -78,6 +79,7 @@ struct _rio { int *state; /* Error state of each fd. 0 (if ok) or errno. */ int numfds; off_t pos; + sds buf; } fdset; } io; }; @@ -118,6 +120,10 @@ static inline off_t rioTell(rio *r) { return r->tell(r); } +static inline int rioFlush(rio *r) { + return r->flush(r); +} + void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); void rioInitWithFdset(rio *r, int *fds, int numfds); -- cgit v1.2.1 From 74f90c61232859f35db4eabf5b0bf1c8e4123bf0 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 16:30:11 +0200 Subject: anet.c: API to set sockets back to blocking mode. --- src/anet.c | 21 +++++++++++++++++---- src/anet.h | 1 + 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/anet.c b/src/anet.c index e3c155940..9be32fda4 100644 --- a/src/anet.c +++ b/src/anet.c @@ -57,24 +57,37 @@ static void anetSetError(char *err, const char *fmt, ...) va_end(ap); } -int anetNonBlock(char *err, int fd) -{ +int anetSetBlock(char *err, int fd, int non_block) { int flags; - /* Set the socket non-blocking. + /* Set the socket blocking (if non_block is zero) or non-blocking. * Note that fcntl(2) for F_GETFL and F_SETFL can't be * interrupted by a signal. */ if ((flags = fcntl(fd, F_GETFL)) == -1) { anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno)); return ANET_ERR; } - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + + if (non_block) + flags |= O_NONBLOCK; + else + flags &= ~O_NONBLOCK; + + if (fcntl(fd, F_SETFL, flags) == -1) { anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno)); return ANET_ERR; } return ANET_OK; } +int anetNonBlock(char *err, int fd) { + return anetSetBlock(err,fd,1); +} + +int anetBlock(char *err, int fd) { + return anetSetBlock(err,fd,0); +} + /* Set TCP keep alive option to detect dead peers. The interval option * is only used for Linux as we are using Linux-specific APIs to set * the probe send time, interval, and count. */ diff --git a/src/anet.h b/src/anet.h index 5191c4b69..c9d54f4a7 100644 --- a/src/anet.h +++ b/src/anet.h @@ -62,6 +62,7 @@ int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port) int anetUnixAccept(char *err, int serversock); int anetWrite(int fd, char *buf, int count); int anetNonBlock(char *err, int fd); +int anetBlock(char *err, int fd); int anetEnableTcpNoDelay(char *err, int fd); int anetDisableTcpNoDelay(char *err, int fd); int anetTcpKeepAlive(char *err, int fd); -- cgit v1.2.1 From 525c488f639672e5b36cc67ede857d25a39c016d Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 16:45:48 +0200 Subject: rio fdset target: handle short writes. While the socket is set in blocking mode, we still can get short writes writing to a socket. --- src/rdb.c | 6 ++++++ src/replication.c | 1 + src/rio.c | 13 +++++++++++-- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index c6a1ec691..b8e02b021 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1347,6 +1347,8 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { redisLog(REDIS_WARNING, "Slave %llu correctly received the streamed RDB file.", slave->id); + /* Restore the socket as non-blocking. */ + anetNonBlock(NULL,slave->fd); } } } @@ -1408,6 +1410,10 @@ int rdbSaveToSlavesSockets(void) { clientids[numfds] = slave->id; fds[numfds++] = slave->fd; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + /* Put the socket in non-blocking mode to simplify RDB transfer. + * We'll restore it when the children returns (since duped socket + * will share the O_NONBLOCK attribute with the parent). */ + anetBlock(NULL,slave->fd); } } diff --git a/src/replication.c b/src/replication.c index fa5ed87d7..ea8265e38 100644 --- a/src/replication.c +++ b/src/replication.c @@ -888,6 +888,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } nread = read(fd,buf,readlen); + printf("NREAD %d (%d)\n", (int)nread, (int)readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); diff --git a/src/rio.c b/src/rio.c index 5153ed28e..3513e1889 100644 --- a/src/rio.c +++ b/src/rio.c @@ -197,8 +197,17 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { broken++; continue; } - retval = write(r->io.fdset.fds[j],p,count); - if (retval != count) { + + /* Make sure to write 'count' bytes to the socket regardless + * of short writes. */ + size_t nwritten = 0; + while(nwritten != count) { + retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten); + if (retval <= 0) break; + nwritten += retval; + } + + if (nwritten != count) { /* Mark this FD as broken. */ r->io.fdset.state[j] = errno; if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO; -- cgit v1.2.1 From fd112f52dce71e1ebd3fc3ffe1b7d521c628877e Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 17:02:42 +0200 Subject: rio.c fdset write() method fixed: wrong type for return value. --- src/rio.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rio.c b/src/rio.c index 3513e1889..2083486c2 100644 --- a/src/rio.c +++ b/src/rio.c @@ -167,7 +167,7 @@ void rioInitWithFile(rio *r, FILE *fp) { * if there is some pending buffer, so this function is also used in order * to implement rioFdsetFlush(). */ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { - size_t retval; + ssize_t retval; int j; unsigned char *p = (unsigned char*) buf; int doflush = (buf == NULL && len == 0); -- cgit v1.2.1 From 456003af25fc7d78867116749a2014611f3e4f64 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 17:11:46 +0200 Subject: Diskless replication: less debugging printfs around. --- src/replication.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index ea8265e38..fa5ed87d7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -888,7 +888,6 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } nread = read(fd,buf,readlen); - printf("NREAD %d (%d)\n", (int)nread, (int)readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); -- cgit v1.2.1 From 2309f15d89e94babb2e9ef29225a40d822828379 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 22 Oct 2014 15:23:21 +0200 Subject: anet.c: new API anetSendTimeout(). --- src/anet.c | 15 +++++++++++++++ src/anet.h | 1 + 2 files changed, 16 insertions(+) diff --git a/src/anet.c b/src/anet.c index 9be32fda4..1e5d85495 100644 --- a/src/anet.c +++ b/src/anet.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -178,6 +179,20 @@ int anetTcpKeepAlive(char *err, int fd) return ANET_OK; } +/* Set the socket send timeout (SO_SNDTIMEO socket option) to the specified + * number of milliseconds, or disable it if the 'ms' argument is zero. */ +int anetSendTimeout(char *err, int fd, long long ms) { + struct timeval tv; + + tv.tv_sec = ms/1000; + tv.tv_usec = (ms%1000)*1000; + if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) { + anetSetError(err, "setsockopt SO_SNDTIMEO: %s", strerror(errno)); + return ANET_ERR; + } + return ANET_OK; +} + /* anetGenericResolve() is called by anetResolve() and anetResolveIP() to * do the actual work. It resolves the hostname "host" and set the string * representation of the IP address into the buffer pointed by "ipbuf". diff --git a/src/anet.h b/src/anet.h index c9d54f4a7..b94a0cd17 100644 --- a/src/anet.h +++ b/src/anet.h @@ -66,6 +66,7 @@ int anetBlock(char *err, int fd); int anetEnableTcpNoDelay(char *err, int fd); int anetDisableTcpNoDelay(char *err, int fd); int anetTcpKeepAlive(char *err, int fd); +int anetSendTimeout(char *err, int fd, long long ms); int anetPeerToString(int fd, char *ip, size_t ip_len, int *port); int anetKeepAlive(char *err, int fd, int interval); int anetSockName(int fd, char *ip, size_t ip_len, int *port); -- cgit v1.2.1 From d4f6a1711defdc0b63629c797b550abbcca2b96f Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 22 Oct 2014 15:53:45 +0200 Subject: Diskless replication: set / reset socket send timeout. We need to avoid that a child -> slaves transfer can continue forever. We use the same timeout used as global replication timeout, which is documented to also affect I/O operations during bulk transfers. --- src/rdb.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rdb.c b/src/rdb.c index b8e02b021..039ea03e9 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1349,6 +1349,7 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { slave->id); /* Restore the socket as non-blocking. */ anetNonBlock(NULL,slave->fd); + anetSendTimeout(NULL,slave->fd,0); } } } @@ -1414,6 +1415,7 @@ int rdbSaveToSlavesSockets(void) { * We'll restore it when the children returns (since duped socket * will share the O_NONBLOCK attribute with the parent). */ anetBlock(NULL,slave->fd); + anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000); } } -- cgit v1.2.1 From b50e3215d279d5eee6947bcba619fb1edb407f77 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 22 Oct 2014 15:58:13 +0200 Subject: Translate rio fdset target EWOULDBLOCK error into ETIMEDOUT. EWOULDBLOCK with the fdset rio target is returned when we try to write but the send timeout socket option triggered an error. Better to translate the error in something the user can actually recognize as a timeout. --- src/rio.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rio.c b/src/rio.c index 2083486c2..738e56fd0 100644 --- a/src/rio.c +++ b/src/rio.c @@ -203,7 +203,14 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { size_t nwritten = 0; while(nwritten != count) { retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten); - if (retval <= 0) break; + if (retval <= 0) { + /* With blocking sockets, which is the sole user of this + * rio target, EWOULDBLOCK is returned only because of + * the SO_SNDTIMEO socket option, so we translate the error + * into one more recognizable by the user. */ + if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; + break; + } nwritten += retval; } -- cgit v1.2.1 From ebb3bd53c2944e987228bcf2f841f7412865bdab Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 23 Oct 2014 23:10:33 +0200 Subject: Diskless replication: child -> parent communication improved. Child now reports full info to the parent including IDs of slaves in failure state and exit code. --- src/rdb.c | 54 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 039ea03e9..8b7764c92 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1292,11 +1292,11 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { server.rdb_save_time_start = -1; /* If the child returns an OK exit code, read the set of slave client - * IDs that received the full RDB payload, closing all the slaves - * which are not among the ones listed. + * IDs and the associated status code. We'll terminate all the slaves + * in error state. * * If the process returned an error, consider the list of slaves that - * can continue to be emtpy, so that it's just a speical case of the + * can continue to be emtpy, so that it's just a special case of the * normal code path. */ ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */ ok_slaves[0] = 0; @@ -1306,7 +1306,7 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) == readlen) { - readlen = ok_slaves[0]*sizeof(uint64_t); + readlen = ok_slaves[0]*sizeof(uint64_t)*2; /* Make space for enough elements as specified by the first * uint64_t element in the array. */ @@ -1334,14 +1334,23 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { uint64_t j; + int errorcode = 0; + /* Search for the slave ID in the reply. In order for a slave to + * continue the replication process, we need to find it in the list, + * and it must have an error code set to 0 (which means success). */ for (j = 0; j < ok_slaves[0]; j++) { - if (slave->id == ok_slaves[j+1]) break; /* Found in OK list. */ + if (slave->id == ok_slaves[2*j+1]) { + errorcode = ok_slaves[2*j+2]; + break; /* Found in slaves list. */ + } } - if (j == ok_slaves[0]) { + if (j == ok_slaves[0] || errorcode != 0) { redisLog(REDIS_WARNING, - "Closing slave %llu: child->slave RDB transfer failed.", - slave->id); + "Closing slave %llu: child->slave RDB transfer failed: %s", + slave->id, + (errorcode == 0) ? "RDB transfer child aborted" + : strerror(errorcode)); freeClient(slave); } else { redisLog(REDIS_WARNING, @@ -1448,29 +1457,34 @@ int rdbSaveToSlavesSockets(void) { /* If we are returning OK, at least one slave was served * with the RDB file as expected, so we need to send a report * to the parent via the pipe. The format of the message is: - * just an array of uint64_t integers (to avoid alignment concerns), - * where the first element is the number of uint64_t elements - * that follows, representing slave client IDs that were - * successfully served. */ - void *msg = zmalloc(sizeof(uint64_t)*(1+numfds)); + * + * ... + * + * len, slave IDs, and slave errors, are all uint64_t integers, + * so basically the reply is composed of 64 bits for the len field + * plus 2 additional 64 bit integers for each entry, for a total + * of 'len' entries. + * + * The 'id' represents the slave's client ID, so that the master + * can match the report with a specific slave, and 'error' is + * set to 0 if the replication process terminated with a success + * or the error code if an error occurred. */ + void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds)); uint64_t *len = msg; uint64_t *ids = len+1; int j, msglen; - *len = 0; + *len = numfds; for (j = 0; j < numfds; j++) { - /* No error? Add it. */ - if (slave_sockets.io.fdset.state[j] == 0) { - ids[*len] = clientids[j]; - (*len)++; - } + *ids++ = clientids[j]; + *ids++ = slave_sockets.io.fdset.state[j]; } /* Write the message to the parent. If we have no good slaves or * we are unable to transfer the message to the parent, we exit * with an error so that the parent will abort the replication * process with all the childre that were waiting. */ - msglen = sizeof(uint64_t)*(1+(*len)); + msglen = sizeof(uint64_t)*(1+2*numfds); if (*len == 0 || write(server.rdb_pipe_write_result_to_parent,msg,msglen) != msglen) -- cgit v1.2.1 From d6797d34c0a3071bf5a458b719cd6c417932f81a Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 24 Oct 2014 09:49:22 +0200 Subject: Diskless replication tested with the multiple slaves consistency test. --- tests/integration/replication.tcl | 131 +++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 64 deletions(-) diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 767349e56..d2668d736 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -94,79 +94,82 @@ start_server {tags {"repl"}} { } } -start_server {tags {"repl"}} { - set master [srv 0 client] - set master_host [srv 0 host] - set master_port [srv 0 port] - set slaves {} - set load_handle0 [start_write_load $master_host $master_port 3] - set load_handle1 [start_write_load $master_host $master_port 5] - set load_handle2 [start_write_load $master_host $master_port 20] - set load_handle3 [start_write_load $master_host $master_port 8] - set load_handle4 [start_write_load $master_host $master_port 4] - start_server {} { - lappend slaves [srv 0 client] +foreach dl {no yes} { + start_server {tags {"repl"}} { + set master [srv 0 client] + $master config set repl-diskless-sync $dl + set master_host [srv 0 host] + set master_port [srv 0 port] + set slaves {} + set load_handle0 [start_write_load $master_host $master_port 3] + set load_handle1 [start_write_load $master_host $master_port 5] + set load_handle2 [start_write_load $master_host $master_port 20] + set load_handle3 [start_write_load $master_host $master_port 8] + set load_handle4 [start_write_load $master_host $master_port 4] start_server {} { lappend slaves [srv 0 client] start_server {} { lappend slaves [srv 0 client] - test "Connect multiple slaves at the same time (issue #141)" { - # Send SALVEOF commands to slaves - [lindex $slaves 0] slaveof $master_host $master_port - [lindex $slaves 1] slaveof $master_host $master_port - [lindex $slaves 2] slaveof $master_host $master_port - - # Wait for all the three slaves to reach the "online" state - set retry 500 - while {$retry} { - set info [r -3 info] - if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { - break + start_server {} { + lappend slaves [srv 0 client] + test "Connect multiple slaves at the same time (issue #141), diskless=$dl" { + # Send SALVEOF commands to slaves + [lindex $slaves 0] slaveof $master_host $master_port + [lindex $slaves 1] slaveof $master_host $master_port + [lindex $slaves 2] slaveof $master_host $master_port + + # Wait for all the three slaves to reach the "online" state + set retry 500 + while {$retry} { + set info [r -3 info] + if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slaves not correctly synchronized" + } + + # Stop the write load + stop_write_load $load_handle0 + stop_write_load $load_handle1 + stop_write_load $load_handle2 + stop_write_load $load_handle3 + stop_write_load $load_handle4 + + # Wait that slaves exit the "loading" state + wait_for_condition 500 100 { + ![string match {*loading:1*} [[lindex $slaves 0] info]] && + ![string match {*loading:1*} [[lindex $slaves 1] info]] && + ![string match {*loading:1*} [[lindex $slaves 2] info]] } else { - incr retry -1 - after 100 + fail "Slaves still loading data after too much time" } - } - if {$retry == 0} { - error "assertion:Slaves not correctly synchronized" - } - # Stop the write load - stop_write_load $load_handle0 - stop_write_load $load_handle1 - stop_write_load $load_handle2 - stop_write_load $load_handle3 - stop_write_load $load_handle4 - - # Wait that slaves exit the "loading" state - wait_for_condition 500 100 { - ![string match {*loading:1*} [[lindex $slaves 0] info]] && - ![string match {*loading:1*} [[lindex $slaves 1] info]] && - ![string match {*loading:1*} [[lindex $slaves 2] info]] - } else { - fail "Slaves still loading data after too much time" - } + # Make sure that slaves and master have same number of keys + wait_for_condition 500 100 { + [$master dbsize] == [[lindex $slaves 0] dbsize] && + [$master dbsize] == [[lindex $slaves 1] dbsize] && + [$master dbsize] == [[lindex $slaves 2] dbsize] + } else { + fail "Different number of keys between masted and slave after too long time." + } - # Make sure that slaves and master have same number of keys - wait_for_condition 500 100 { - [$master dbsize] == [[lindex $slaves 0] dbsize] && - [$master dbsize] == [[lindex $slaves 1] dbsize] && - [$master dbsize] == [[lindex $slaves 2] dbsize] - } else { - fail "Different number of keys between masted and slave after too long time." + # Check digests + set digest [$master debug digest] + set digest0 [[lindex $slaves 0] debug digest] + set digest1 [[lindex $slaves 1] debug digest] + set digest2 [[lindex $slaves 2] debug digest] + assert {$digest ne 0000000000000000000000000000000000000000} + assert {$digest eq $digest0} + assert {$digest eq $digest1} + assert {$digest eq $digest2} } - - # Check digests - set digest [$master debug digest] - set digest0 [[lindex $slaves 0] debug digest] - set digest1 [[lindex $slaves 1] debug digest] - set digest2 [[lindex $slaves 2] debug digest] - assert {$digest ne 0000000000000000000000000000000000000000} - assert {$digest eq $digest0} - assert {$digest eq $digest1} - assert {$digest eq $digest2} - } - } + } + } } } } -- cgit v1.2.1 From 18de5395b219bf407ac06dcf3baae48e4d36d1e5 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 24 Oct 2014 10:12:43 +0200 Subject: Diskless replication documented inside example redis.conf. --- redis.conf | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/redis.conf b/redis.conf index 0547cada2..f74d6f512 100644 --- a/redis.conf +++ b/redis.conf @@ -240,6 +240,33 @@ slave-serve-stale-data yes # administrative / dangerous commands. slave-read-only yes +# Replication SYNC strategy: disk or socket. +# +# New slaves and reconnecting slaves that are not able to continue the replication +# process just receiving differences, need to do what is called a "full +# synchronization". An RDB file is transmitted from the master to the slaves. +# The transmission can happen in two different ways: +# +# 1) Disk-backed: The Redis master creates a new process that writes the RDB +# file on disk. Later the file is transferred by the parent +# process to the slaves incrementally. +# 2) Diskless: The Redis master creates a new process that directly writes the +# RDB file to slave sockets, without touching the disk at all. +# +# With disk-backed replication, while the RDB file is generated, more slaves +# can be queued and served with the RDB file as soon as the current child producing +# the RDB file finishes its work. With diskless replication instead once +# the transfer starts, new slaves arriving will be queued and a new transfer +# will start when the current one terminates. +# +# When diskless replication is used, the master waits a configurable amount of +# time (in seconds) before starting the transfer in the hope that multiple slaves +# will arrive and the transfer can be parallelized. +# +# With slow disks and fast (large bandwidth) networks, diskless replication +# works better. +repl-diskless-sync no + # Slaves send PINGs to server in a predefined interval. It's possible to change # this interval with the repl_ping_slave_period option. The default value is 10 # seconds. -- cgit v1.2.1 From c4dbc7cdecd876635318aff409e4382780ba2b15 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 24 Oct 2014 10:38:42 +0200 Subject: Remove duplicated log message about starting BGSAVE. --- src/replication.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index fa5ed87d7..24b3455c8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -531,7 +531,6 @@ void syncCommand(redisClient *c) { redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); } else { /* Ok we don't have a BGSAVE in progress, let's start one. */ - redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (startBgsaveForReplication() != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); -- cgit v1.2.1 From 707352439c317dd2151d02606de25d2d2a147a67 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 27 Oct 2014 10:36:30 +0100 Subject: Diskless sync delay is now configurable. --- src/config.c | 12 ++++++++++++ src/redis.c | 1 + src/redis.h | 1 + src/replication.c | 6 +++--- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/config.c b/src/config.c index 1b7b16f67..05cb7c9fe 100644 --- a/src/config.c +++ b/src/config.c @@ -274,6 +274,12 @@ void loadServerConfigFromString(char *config) { if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) { + server.repl_diskless_sync_delay = atoi(argv[1]); + if (server.repl_diskless_sync_delay < 0) { + err = "repl-diskless-sync-delay can't be negative"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) { long long size = memtoll(argv[1],NULL); if (size <= 0) { @@ -920,6 +926,10 @@ void configSetCommand(redisClient *c) { if (yn == -1) goto badfmt; server.repl_diskless_sync = yn; + } else if (!strcasecmp(c->argv[2]->ptr,"repl-diskless-sync-delay")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0) goto badfmt; + server.repl_diskless_sync_delay = ll; } else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; @@ -1058,6 +1068,7 @@ void configGetCommand(redisClient *c) { config_get_numerical_field("cluster-node-timeout",server.cluster_node_timeout); config_get_numerical_field("cluster-migration-barrier",server.cluster_migration_barrier); config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor); + config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay); /* Bool (yes/no) values */ config_get_bool_field("cluster-require-full-coverage", @@ -1804,6 +1815,7 @@ int rewriteConfig(char *path) { rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT); rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY); rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,REDIS_DEFAULT_REPL_DISKLESS_SYNC); + rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY); rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,REDIS_DEFAULT_SLAVE_PRIORITY); rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,REDIS_DEFAULT_MIN_SLAVES_TO_WRITE); rewriteConfigNumericalOption(state,"min-slaves-max-lag",server.repl_min_slaves_max_lag,REDIS_DEFAULT_MIN_SLAVES_MAX_LAG); diff --git a/src/redis.c b/src/redis.c index 61a103aaf..09b8103bc 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1481,6 +1481,7 @@ void initServerConfig(void) { server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.repl_diskless_sync = REDIS_DEFAULT_REPL_DISKLESS_SYNC; + server.repl_diskless_sync_delay = REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; diff --git a/src/redis.h b/src/redis.h index 8583beae6..f4f933eb9 100644 --- a/src/redis.h +++ b/src/redis.h @@ -802,6 +802,7 @@ struct redisServer { int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ + int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ diff --git a/src/replication.c b/src/replication.c index 24b3455c8..0b40110a6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2034,9 +2034,9 @@ void replicationCron(void) { } } - if (slaves_waiting && max_idle > REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY) - { - /* Let's start a BGSAVE with disk target. */ + if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { + /* Start a BGSAVE. Usually with socket target, or with disk target + * if there was a recent socket -> disk config change. */ if (startBgsaveForReplication() == REDIS_OK) { /* It started! We need to change the state of slaves * from WAIT_BGSAVE_START to WAIT_BGSAVE_END. */ -- cgit v1.2.1 From 3b9a97984a85e342cc11459a5a75b2cf80d79c08 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 27 Oct 2014 10:42:49 +0100 Subject: Document repl-diskless-sync-delay in redis.conf. --- redis.conf | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/redis.conf b/redis.conf index f74d6f512..81dcf5a86 100644 --- a/redis.conf +++ b/redis.conf @@ -267,6 +267,18 @@ slave-read-only yes # works better. repl-diskless-sync no +# When diskless replication is enabled, it is possible to configure the delay +# the server waits in order to spawn the child that trnasfers the RDB via socket +# to the slaves. +# +# This is important since once the transfer starts, it is not possible to serve +# new slaves arriving, that will be queued for the next RDB transfer, so the server +# waits a delay in order to let more slaves arrive. +# +# The delay is specified in seconds, and by default is 5 seconds. To disable +# it entirely just set it to 0 seconds and the transfer will start ASAP. +repl-diskless-sync-delay 5 + # Slaves send PINGs to server in a predefined interval. It's possible to change # this interval with the repl_ping_slave_period option. The default value is 10 # seconds. -- cgit v1.2.1 From a27befc495009693ee4c9c110fe7132f330ac25d Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 27 Oct 2014 10:48:32 +0100 Subject: Diskless replication: log BGSAVE delay only when it is non-zero. --- src/replication.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 0b40110a6..e9c98c5c6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -528,7 +528,8 @@ void syncCommand(redisClient *c) { * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; - redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); + if (server.repl_diskless_sync_delay) + redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); } else { /* Ok we don't have a BGSAVE in progress, let's start one. */ if (startBgsaveForReplication() != REDIS_OK) { -- cgit v1.2.1 From 8a416ca46e705e52587c8ef7163b8a158fff8357 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 27 Oct 2014 11:58:20 +0100 Subject: Added a function to get slave name for logs. --- src/networking.c | 8 ++------ src/redis.h | 1 + src/replication.c | 35 ++++++++++++++++++++++++++--------- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/networking.c b/src/networking.c index c7b1c9ba7..cc9bbd98c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -678,12 +678,8 @@ void freeClient(redisClient *c) { /* Log link disconnection with slave */ if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) { - char ip[REDIS_IP_STR_LEN]; - - if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { - redisLog(REDIS_WARNING,"Connection with slave %s:%d lost.", - ip, c->slave_listening_port); - } + redisLog(REDIS_WARNING,"Connection with slave %s lost.", + replicationGetSlaveName(c)); } /* Free the query buffer */ diff --git a/src/redis.h b/src/redis.h index f4f933eb9..f5301ab26 100644 --- a/src/redis.h +++ b/src/redis.h @@ -1162,6 +1162,7 @@ void unblockClientWaitingReplicas(redisClient *c); int replicationCountAcksByOffset(long long offset); void replicationSendNewlineToMaster(void); long long replicationGetSlaveOffset(void); +char *replicationGetSlaveName(redisClient *c); /* Generic persistence functions */ void startLoading(FILE *fp); diff --git a/src/replication.c b/src/replication.c index e9c98c5c6..048acfaab 100644 --- a/src/replication.c +++ b/src/replication.c @@ -41,6 +41,30 @@ void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(int newfd); void replicationSendAck(void); +/* --------------------------- Utility functions ---------------------------- */ + +/* Return the pointer to a string representing the slave ip:listening_port + * pair. Mostly useful for logging, since we want to log a slave using its + * IP address and it's listening port which is more clear for the user, for + * example: "Closing connection with slave 10.1.2.3:6380". */ +char *replicationGetSlaveName(redisClient *c) { + static char buf[REDIS_PEER_ID_LEN]; + char ip[REDIS_IP_STR_LEN]; + + ip[0] = '\0'; + buf[0] = '\0'; + if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { + if (c->slave_listening_port) + snprintf(buf,sizeof(buf),"%s:%d",ip,c->slave_listening_port); + else + snprintf(buf,sizeof(buf),"%s:",ip); + } else { + snprintf(buf,sizeof(buf),"client id #%llu", + (unsigned long long) c->id); + } + return buf; +} + /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { @@ -1973,15 +1997,8 @@ void replicationCron(void) { if (slave->flags & REDIS_PRE_PSYNC) continue; if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { - char ip[REDIS_IP_STR_LEN]; - int port; - - if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) { - redisLog(REDIS_WARNING, - "Disconnecting timedout slave: %s:%d", - ip, slave->slave_listening_port); - } - freeClient(slave); + redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s", + replicationGetSlaveName(slave)); } } } -- cgit v1.2.1 From 775cc30a98cd8f3ab3a14c76973234f84df3262d Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 27 Oct 2014 12:23:03 +0100 Subject: Use new slave name function for diskless repl reporting. --- src/rdb.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 8b7764c92..d2fd405db 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1347,15 +1347,15 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { } if (j == ok_slaves[0] || errorcode != 0) { redisLog(REDIS_WARNING, - "Closing slave %llu: child->slave RDB transfer failed: %s", - slave->id, + "Closing slave %s: child->slave RDB transfer failed: %s", + replicationGetSlaveName(slave), (errorcode == 0) ? "RDB transfer child aborted" : strerror(errorcode)); freeClient(slave); } else { redisLog(REDIS_WARNING, - "Slave %llu correctly received the streamed RDB file.", - slave->id); + "Slave %s correctly received the streamed RDB file.", + replicationGetSlaveName(slave)); /* Restore the socket as non-blocking. */ anetNonBlock(NULL,slave->fd); anetSendTimeout(NULL,slave->fd,0); -- cgit v1.2.1 From 4b8f4b90b904c803cdc4a3ba82fee6e8ee8423fe Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 27 Oct 2014 12:30:07 +0100 Subject: Log slave ip:port in more log messages. --- src/replication.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/replication.c b/src/replication.c index 048acfaab..43d5b6935 100644 --- a/src/replication.c +++ b/src/replication.c @@ -369,7 +369,8 @@ int masterTryPartialResynchronization(redisClient *c) { "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); } else { - redisLog(REDIS_NOTICE,"Full resync requested by slave."); + redisLog(REDIS_NOTICE,"Full resync requested by slave %s", + replicationGetSlaveName(c)); } goto need_full_resync; } @@ -382,10 +383,10 @@ int masterTryPartialResynchronization(redisClient *c) { psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { redisLog(REDIS_NOTICE, - "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); + "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); if (psync_offset > server.master_repl_offset) { redisLog(REDIS_WARNING, - "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); + "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); } goto need_full_resync; } @@ -408,7 +409,9 @@ int masterTryPartialResynchronization(redisClient *c) { } psync_len = addReplyReplicationBacklog(c,psync_offset); redisLog(REDIS_NOTICE, - "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); + "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", + replicationGetSlaveName(c), + psync_len, psync_offset); /* Note that we don't need to set the selected DB at server.slaveseldb * to -1 to force the master to emit SELECT, since the slave already * has this state from the previous connection with the master. */ @@ -475,7 +478,8 @@ void syncCommand(redisClient *c) { return; } - redisLog(REDIS_NOTICE,"Slave asks for synchronization"); + redisLog(REDIS_NOTICE,"Slave %s asks for synchronization", + replicationGetSlaveName(c)); /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however @@ -750,7 +754,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { putSlaveOnline(slave); redisLog(REDIS_NOTICE, - "Synchronization with slave succeeded (socket)"); + "Synchronization with slave %s succeeded (socket)", + replicationGetSlaveName(slave)); } else { if (bgsaveerr != REDIS_OK) { freeClient(slave); -- cgit v1.2.1 From 9ec22d9223ca3d74aa81dd5af86809ceeb730670 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 29 Oct 2014 12:48:22 +0100 Subject: Diskless replication: missing listRewind() added. This caused BGSAVE to be triggered a second time without any need when we switch from socket to disk target via the command CONFIG SET repl-diskless-sync no and there is already a slave waiting for the BGSAVE to start. Also comments clarified about what is happening. --- src/replication.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 43d5b6935..77f9fa8bc 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2062,7 +2062,11 @@ void replicationCron(void) { * if there was a recent socket -> disk config change. */ if (startBgsaveForReplication() == REDIS_OK) { /* It started! We need to change the state of slaves - * from WAIT_BGSAVE_START to WAIT_BGSAVE_END. */ + * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case + * the current target is disk. Otherwise it was already done + * by rdbSaveToSlavesSockets() which is called by + * startBgsaveForReplication(). */ + listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) -- cgit v1.2.1