From 10aafdad56fa79bd7f95d9b190054b2e56b6cddd Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Oct 2014 11:36:12 +0200 Subject: Diskless replication: rio fdset target new supports buffering. To perform a socket write() for each RDB rio API write call was extremely unefficient, so now rio has minimal buffering capabilities. Writes are accumulated into a buffer and only when a given limit is reacehd are actually wrote to the N slaves FDs. Trivia: rio lacked support for buffering since our targets were: 1) Memory buffers. 2) C standard I/O. Both were buffered already. --- src/rdb.c | 3 +++ src/rio.c | 48 +++++++++++++++++++++++++++++++++++++++++++++++- src/rio.h | 6 ++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/rdb.c b/src/rdb.c index 45beae14d..c6a1ec691 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1425,6 +1425,9 @@ int rdbSaveToSlavesSockets(void) { redisSetProcTitle("redis-rdb-to-slaves"); retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL); + if (retval == REDIS_OK && rioFlush(&slave_sockets) == 0) + retval = REDIS_ERR; + if (retval == REDIS_OK) { size_t private_dirty = zmalloc_get_private_dirty(); diff --git a/src/rio.c b/src/rio.c index dbda4c668..5153ed28e 100644 --- a/src/rio.c +++ b/src/rio.c @@ -78,10 +78,18 @@ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioBufferFlush(rio *r) { + REDIS_NOTUSED(r); + return 1; /* Nothing to do, our write just appends to the buffer. */ +} + static const rio rioBufferIO = { rioBufferRead, rioBufferWrite, rioBufferTell, + rioBufferFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -124,10 +132,17 @@ static off_t rioFileTell(rio *r) { return ftello(r->io.file.fp); } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFileFlush(rio *r) { + return (fflush(r->io.file.fp) == 0) ? 1 : 0; +} + static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, + rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -146,11 +161,29 @@ void rioInitWithFile(rio *r, FILE *fp) { /* Returns 1 or 0 for success/failure. * The function returns success as long as we are able to correctly write - * to at least one file descriptor. */ + * to at least one file descriptor. + * + * When buf is NULL adn len is 0, the function performs a flush operation + * if there is some pending buffer, so this function is also used in order + * to implement rioFdsetFlush(). */ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { size_t retval; int j; unsigned char *p = (unsigned char*) buf; + int doflush = (buf == NULL && len == 0); + + /* To start we always append to our buffer. If it gets larger than + * a given size, we actually write to the sockets. */ + if (len) { + r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len); + len = 0; /* Prevent entering the while belove if we don't flush. */ + if (sdslen(r->io.fdset.buf) > REDIS_IOBUF_LEN) doflush = 1; + } + + if (doflush) { + p = (unsigned char*) r->io.fdset.buf; + len = sdslen(r->io.fdset.buf); + } /* Write in little chunchs so that when there are big writes we * parallelize while the kernel is sending data in background to @@ -176,6 +209,8 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { len -= count; r->io.fdset.pos += count; } + + if (doflush) sdsclear(r->io.fdset.buf); return 1; } @@ -192,10 +227,19 @@ static off_t rioFdsetTell(rio *r) { return r->io.fdset.pos; } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdsetFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdsetWrite(r,NULL,0); +} + static const rio rioFdsetIO = { rioFdsetRead, rioFdsetWrite, rioFdsetTell, + rioFdsetFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -213,11 +257,13 @@ void rioInitWithFdset(rio *r, int *fds, int numfds) { for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0; r->io.fdset.numfds = numfds; r->io.fdset.pos = 0; + r->io.fdset.buf = sdsempty(); } void rioFreeFdset(rio *r) { zfree(r->io.fdset.fds); zfree(r->io.fdset.state); + sdsfree(r->io.fdset.buf); } /* ---------------------------- Generic functions ---------------------------- */ diff --git a/src/rio.h b/src/rio.h index b73f4a050..e5fa0cd33 100644 --- a/src/rio.h +++ b/src/rio.h @@ -43,6 +43,7 @@ struct _rio { size_t (*read)(struct _rio *, void *buf, size_t len); size_t (*write)(struct _rio *, const void *buf, size_t len); off_t (*tell)(struct _rio *); + int (*flush)(struct _rio *); /* The update_cksum method if not NULL is used to compute the checksum of * all the data that was read or written so far. The method should be * designed so that can be called with the current checksum, and the buf @@ -78,6 +79,7 @@ struct _rio { int *state; /* Error state of each fd. 0 (if ok) or errno. */ int numfds; off_t pos; + sds buf; } fdset; } io; }; @@ -118,6 +120,10 @@ static inline off_t rioTell(rio *r) { return r->tell(r); } +static inline int rioFlush(rio *r) { + return r->flush(r); +} + void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); void rioInitWithFdset(rio *r, int *fds, int numfds); -- cgit v1.2.1