summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2014-10-10 17:44:06 +0200
committerantirez <antirez@gmail.com>2014-10-10 17:44:06 +0200
commit850ea57c37e517eb0f10d8fc319332ca339d0ba2 (patch)
treebe207b09e90866ee6820faac386c6c25e02a1930
parent29db3227ab8980f098080371ef57c4e1eeb38180 (diff)
downloadredis-850ea57c37e517eb0f10d8fc319332ca339d0ba2.tar.gz
rio.c: draft implementation of fdset target implemented.
-rw-r--r--src/rio.c60
-rw-r--r--src/rio.h9
2 files changed, 69 insertions, 0 deletions
diff --git a/src/rio.c b/src/rio.c
index aa5061b75..88f6781e0 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -142,6 +142,66 @@ void rioInitWithFile(rio *r, FILE *fp) {
r->io.file.autosync = 0;
}
+/* ------------------- File descriptors set implementation ------------------- */
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
+ size_t retval;
+ int j;
+ unsigned char *p = (unsigned char*) buf;
+
+ /* Write in little chunchs so that when there are big writes we
+ * parallelize while the kernel is sending data in background to
+ * the TCP socket. */
+ while(len) {
+ size_t count = len < 1024 ? len : 1024;
+ for (j = 0; j < r->io.fdset.numfds; j++) {
+ retval = write(r->io.fdset.fds[j],p,count);
+ if (retval != count) return 0;
+ }
+ p += count;
+ len -= count;
+ r->io.fdset.pos += count;
+ }
+ return 1;
+}
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
+ REDIS_NOTUSED(r);
+ REDIS_NOTUSED(buf);
+ REDIS_NOTUSED(len);
+ return 0; /* Error, this target does not support reading. */
+}
+
+/* Returns read/write position in file. */
+static off_t rioFdsetTell(rio *r) {
+ return r->io.fdset.pos;
+}
+
+static const rio rioFdsetIO = {
+ rioFdsetRead,
+ rioFdsetWrite,
+ rioFdsetTell,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+void rioInitWithFdset(rio *r, int *fds, int numfds) {
+ *r = rioFdsetIO;
+ r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
+ memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
+ r->io.fdset.numfds = numfds;
+ r->io.fdset.pos = 0;
+}
+
+void rioFreeFdset(rio *r) {
+ zfree(r->io.fdset.fds);
+}
+
/* ---------------------------- Generic functions ---------------------------- */
/* This function can be installed both in memory and file streams when checksum
diff --git a/src/rio.h b/src/rio.h
index 2d12c6cc7..0d485d454 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -61,15 +61,23 @@ struct _rio {
/* Backend-specific vars. */
union {
+ /* In-memory buffer target. */
struct {
sds ptr;
off_t pos;
} buffer;
+ /* Stdio file pointer target. */
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
+ /* Multiple FDs target (used to write to N sockets). */
+ struct {
+ int *fds;
+ int numfds;
+ off_t pos;
+ } fdset;
} io;
};
@@ -111,6 +119,7 @@ static inline off_t rioTell(rio *r) {
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
+void rioInitWithFdset(rio *r, int *fds, int numfds);
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);