diff options
Diffstat (limited to 'src/rio.c')
-rw-r--r-- | src/rio.c | 182 |
1 files changed, 165 insertions, 17 deletions
@@ -53,7 +53,9 @@ #include "util.h" #include "crc64.h" #include "config.h" -#include "redis.h" +#include "server.h" + +/* ------------------------- Buffer I/O implementation ----------------------- */ /* Returns 1 or 0 for success/failure. */ static size_t rioBufferWrite(rio *r, const void *buf, size_t len) { @@ -76,6 +78,33 @@ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioBufferFlush(rio *r) { + UNUSED(r); + return 1; /* Nothing to do, our write just appends to the buffer. */ +} + +static const rio rioBufferIO = { + rioBufferRead, + rioBufferWrite, + rioBufferTell, + rioBufferFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithBuffer(rio *r, sds s) { + *r = rioBufferIO; + r->io.buffer.ptr = s; + r->io.buffer.pos = 0; +} + +/* --------------------- Stdio file pointer implementation ------------------- */ + /* Returns 1 or 0 for success/failure. */ static size_t rioFileWrite(rio *r, const void *buf, size_t len) { size_t retval; @@ -103,21 +132,17 @@ static off_t rioFileTell(rio *r) { return ftello(r->io.file.fp); } -static const rio rioBufferIO = { - rioBufferRead, - rioBufferWrite, - rioBufferTell, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ -}; +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFileFlush(rio *r) { + return (fflush(r->io.file.fp) == 0) ? 1 : 0; +} static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, + rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -132,12 +157,134 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } -void rioInitWithBuffer(rio *r, sds s) { - *r = rioBufferIO; - r->io.buffer.ptr = s; - r->io.buffer.pos = 0; +/* ------------------- File descriptors set implementation ------------------- */ + +/* Returns 1 or 0 for success/failure. + * The function returns success as long as we are able to correctly write + * to at least one file descriptor. + * + * When buf is NULL and len is 0, the function performs a flush operation + * if there is some pending buffer, so this function is also used in order + * to implement rioFdsetFlush(). */ +static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { + ssize_t retval; + int j; + unsigned char *p = (unsigned char*) buf; + int doflush = (buf == NULL && len == 0); + + /* To start we always append to our buffer. If it gets larger than + * a given size, we actually write to the sockets. */ + if (len) { + r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len); + len = 0; /* Prevent entering the while below if we don't flush. */ + if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1; + } + + if (doflush) { + p = (unsigned char*) r->io.fdset.buf; + len = sdslen(r->io.fdset.buf); + } + + /* Write in little chunchs so that when there are big writes we + * parallelize while the kernel is sending data in background to + * the TCP socket. */ + while(len) { + size_t count = len < 1024 ? len : 1024; + int broken = 0; + for (j = 0; j < r->io.fdset.numfds; j++) { + if (r->io.fdset.state[j] != 0) { + /* Skip FDs alraedy in error. */ + broken++; + continue; + } + + /* Make sure to write 'count' bytes to the socket regardless + * of short writes. */ + size_t nwritten = 0; + while(nwritten != count) { + retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten); + if (retval <= 0) { + /* With blocking sockets, which is the sole user of this + * rio target, EWOULDBLOCK is returned only because of + * the SO_SNDTIMEO socket option, so we translate the error + * into one more recognizable by the user. */ + if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; + break; + } + nwritten += retval; + } + + if (nwritten != count) { + /* Mark this FD as broken. */ + r->io.fdset.state[j] = errno; + if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO; + } + } + if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */ + p += count; + len -= count; + r->io.fdset.pos += count; + } + + if (doflush) sdsclear(r->io.fdset.buf); + return 1; } +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdsetRead(rio *r, void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not support reading. */ +} + +/* Returns read/write position in file. */ +static off_t rioFdsetTell(rio *r) { + return r->io.fdset.pos; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdsetFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdsetWrite(r,NULL,0); +} + +static const rio rioFdsetIO = { + rioFdsetRead, + rioFdsetWrite, + rioFdsetTell, + rioFdsetFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithFdset(rio *r, int *fds, int numfds) { + int j; + + *r = rioFdsetIO; + r->io.fdset.fds = zmalloc(sizeof(int)*numfds); + r->io.fdset.state = zmalloc(sizeof(int)*numfds); + memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds); + for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0; + r->io.fdset.numfds = numfds; + r->io.fdset.pos = 0; + r->io.fdset.buf = sdsempty(); +} + +/* release the rio stream. */ +void rioFreeFdset(rio *r) { + zfree(r->io.fdset.fds); + zfree(r->io.fdset.state); + sdsfree(r->io.fdset.buf); +} + +/* ---------------------------- Generic functions ---------------------------- */ + /* This function can be installed both in memory and file streams when checksum * computation is needed. */ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { @@ -153,11 +300,12 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { * disk I/O concentrated in very little time. When we fsync in an explicit * way instead the I/O pressure is more distributed across time. */ void rioSetAutoSync(rio *r, off_t bytes) { - redisAssert(r->read == rioFileIO.read); + serverAssert(r->read == rioFileIO.read); r->io.file.autosync = bytes; } -/* ------------------------------ Higher level interface --------------------------- +/* --------------------------- Higher level interface -------------------------- + * * The following higher level functions use lower level rio.c functions to help * generating the Redis protocol for the Append Only File. */ |