summaryrefslogtreecommitdiff
path: root/src/rio.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2014-10-17 11:36:12 +0200
committerantirez <antirez@gmail.com>2014-10-17 11:36:12 +0200
commit10aafdad56fa79bd7f95d9b190054b2e56b6cddd (patch)
treef5ab58cd684cde63040f3bdeedf2eff29ac24ada /src/rio.c
parentb1337b15b6b090315d884d7372eea344926ae95b (diff)
downloadredis-10aafdad56fa79bd7f95d9b190054b2e56b6cddd.tar.gz
Diskless replication: rio fdset target new supports buffering.
To perform a socket write() for each RDB rio API write call was extremely unefficient, so now rio has minimal buffering capabilities. Writes are accumulated into a buffer and only when a given limit is reacehd are actually wrote to the N slaves FDs. Trivia: rio lacked support for buffering since our targets were: 1) Memory buffers. 2) C standard I/O. Both were buffered already.
Diffstat (limited to 'src/rio.c')
-rw-r--r--src/rio.c48
1 files changed, 47 insertions, 1 deletions
diff --git a/src/rio.c b/src/rio.c
index dbda4c668..5153ed28e 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -78,10 +78,18 @@ static off_t rioBufferTell(rio *r) {
return r->io.buffer.pos;
}
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioBufferFlush(rio *r) {
+ REDIS_NOTUSED(r);
+ return 1; /* Nothing to do, our write just appends to the buffer. */
+}
+
static const rio rioBufferIO = {
rioBufferRead,
rioBufferWrite,
rioBufferTell,
+ rioBufferFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
@@ -124,10 +132,17 @@ static off_t rioFileTell(rio *r) {
return ftello(r->io.file.fp);
}
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFileFlush(rio *r) {
+ return (fflush(r->io.file.fp) == 0) ? 1 : 0;
+}
+
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
+ rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
@@ -146,11 +161,29 @@ void rioInitWithFile(rio *r, FILE *fp) {
/* Returns 1 or 0 for success/failure.
* The function returns success as long as we are able to correctly write
- * to at least one file descriptor. */
+ * to at least one file descriptor.
+ *
+ * When buf is NULL adn len is 0, the function performs a flush operation
+ * if there is some pending buffer, so this function is also used in order
+ * to implement rioFdsetFlush(). */
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
size_t retval;
int j;
unsigned char *p = (unsigned char*) buf;
+ int doflush = (buf == NULL && len == 0);
+
+ /* To start we always append to our buffer. If it gets larger than
+ * a given size, we actually write to the sockets. */
+ if (len) {
+ r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
+ len = 0; /* Prevent entering the while belove if we don't flush. */
+ if (sdslen(r->io.fdset.buf) > REDIS_IOBUF_LEN) doflush = 1;
+ }
+
+ if (doflush) {
+ p = (unsigned char*) r->io.fdset.buf;
+ len = sdslen(r->io.fdset.buf);
+ }
/* Write in little chunchs so that when there are big writes we
* parallelize while the kernel is sending data in background to
@@ -176,6 +209,8 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
len -= count;
r->io.fdset.pos += count;
}
+
+ if (doflush) sdsclear(r->io.fdset.buf);
return 1;
}
@@ -192,10 +227,19 @@ static off_t rioFdsetTell(rio *r) {
return r->io.fdset.pos;
}
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFdsetFlush(rio *r) {
+ /* Our flush is implemented by the write method, that recognizes a
+ * buffer set to NULL with a count of zero as a flush request. */
+ return rioFdsetWrite(r,NULL,0);
+}
+
static const rio rioFdsetIO = {
rioFdsetRead,
rioFdsetWrite,
rioFdsetTell,
+ rioFdsetFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
@@ -213,11 +257,13 @@ void rioInitWithFdset(rio *r, int *fds, int numfds) {
for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
r->io.fdset.numfds = numfds;
r->io.fdset.pos = 0;
+ r->io.fdset.buf = sdsempty();
}
void rioFreeFdset(rio *r) {
zfree(r->io.fdset.fds);
zfree(r->io.fdset.state);
+ sdsfree(r->io.fdset.buf);
}
/* ---------------------------- Generic functions ---------------------------- */