diff options
Diffstat (limited to 'src/rio.c')
-rw-r--r-- | src/rio.c | 48 |
1 files changed, 47 insertions, 1 deletions
@@ -78,10 +78,18 @@ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioBufferFlush(rio *r) { + REDIS_NOTUSED(r); + return 1; /* Nothing to do, our write just appends to the buffer. */ +} + static const rio rioBufferIO = { rioBufferRead, rioBufferWrite, rioBufferTell, + rioBufferFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -124,10 +132,17 @@ static off_t rioFileTell(rio *r) { return ftello(r->io.file.fp); } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFileFlush(rio *r) { + return (fflush(r->io.file.fp) == 0) ? 1 : 0; +} + static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, + rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -146,11 +161,29 @@ void rioInitWithFile(rio *r, FILE *fp) { /* Returns 1 or 0 for success/failure. * The function returns success as long as we are able to correctly write - * to at least one file descriptor. */ + * to at least one file descriptor. + * + * When buf is NULL adn len is 0, the function performs a flush operation + * if there is some pending buffer, so this function is also used in order + * to implement rioFdsetFlush(). */ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { size_t retval; int j; unsigned char *p = (unsigned char*) buf; + int doflush = (buf == NULL && len == 0); + + /* To start we always append to our buffer. If it gets larger than + * a given size, we actually write to the sockets. */ + if (len) { + r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len); + len = 0; /* Prevent entering the while belove if we don't flush. */ + if (sdslen(r->io.fdset.buf) > REDIS_IOBUF_LEN) doflush = 1; + } + + if (doflush) { + p = (unsigned char*) r->io.fdset.buf; + len = sdslen(r->io.fdset.buf); + } /* Write in little chunchs so that when there are big writes we * parallelize while the kernel is sending data in background to @@ -176,6 +209,8 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { len -= count; r->io.fdset.pos += count; } + + if (doflush) sdsclear(r->io.fdset.buf); return 1; } @@ -192,10 +227,19 @@ static off_t rioFdsetTell(rio *r) { return r->io.fdset.pos; } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdsetFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdsetWrite(r,NULL,0); +} + static const rio rioFdsetIO = { rioFdsetRead, rioFdsetWrite, rioFdsetTell, + rioFdsetFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -213,11 +257,13 @@ void rioInitWithFdset(rio *r, int *fds, int numfds) { for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0; r->io.fdset.numfds = numfds; r->io.fdset.pos = 0; + r->io.fdset.buf = sdsempty(); } void rioFreeFdset(rio *r) { zfree(r->io.fdset.fds); zfree(r->io.fdset.state); + sdsfree(r->io.fdset.buf); } /* ---------------------------- Generic functions ---------------------------- */ |