summaryrefslogtreecommitdiff
path: root/src/rio.c
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2019-08-11 16:07:53 +0300
committerYossi Gottlieb <yossigo@gmail.com>2019-10-07 21:06:30 +0300
commit5a477946065bcf05b335ededd6b794e82882ab73 (patch)
tree53dee2990f0d86e042f979322d998c25ade4879d /src/rio.c
parentb087dd1db60ed23d9e59304deb0b1599437f6e23 (diff)
downloadredis-5a477946065bcf05b335ededd6b794e82882ab73.tar.gz
diskless replication rdb transfer uses pipe, and writes to sockets form the parent process.
misc: - handle SSL_has_pending by iterating though these in beforeSleep, and setting timeout of 0 to aeProcessEvents - fix issue with epoll signaling EPOLLHUP and EPOLLERR only to the write handlers. (needed to detect the rdb pipe was closed) - add key-load-delay config for testing - trim connShutdown which is no longer needed - rioFdsetWrite -> rioFdWrite - simplified since there's no longer need to write to multiple FDs - don't detect rdb child exited (don't call wait3) until we detect the pipe is closed - Cleanup bad optimization from rio.c, add another one
Diffstat (limited to 'src/rio.c')
-rw-r--r--src/rio.c142
1 files changed, 58 insertions, 84 deletions
diff --git a/src/rio.c b/src/rio.c
index 5e3b4a06e..c8c924380 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -272,85 +272,67 @@ void rioFreeConn(rio *r, sds *remaining) {
r->io.conn.buf = NULL;
}
-/* ------------------- File descriptors set implementation ------------------
- * This target is used to write the RDB file to N different replicas via
- * sockets, when the master just streams the data to the replicas without
- * creating an RDB on-disk image (diskless replication option).
+/* ------------------- File descriptor implementation ------------------
+ * This target is used to write the RDB file to pipe, when the master just
+ * streams the data to the replicas without creating an RDB on-disk image
+ * (diskless replication option).
* It only implements writes. */
/* Returns 1 or 0 for success/failure.
- * The function returns success as long as we are able to correctly write
- * to at least one file descriptor.
*
* When buf is NULL and len is 0, the function performs a flush operation
* if there is some pending buffer, so this function is also used in order
- * to implement rioFdsetFlush(). */
-static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
+ * to implement rioFdFlush(). */
+static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
ssize_t retval;
- int j;
unsigned char *p = (unsigned char*) buf;
int doflush = (buf == NULL && len == 0);
- /* To start we always append to our buffer. If it gets larger than
- * a given size, we actually write to the sockets. */
- if (len) {
- r->io.connset.buf = sdscatlen(r->io.connset.buf,buf,len);
- len = 0; /* Prevent entering the while below if we don't flush. */
- if (sdslen(r->io.connset.buf) > PROTO_IOBUF_LEN) doflush = 1;
- }
-
- if (doflush) {
- p = (unsigned char*) r->io.connset.buf;
- len = sdslen(r->io.connset.buf);
+ /* For small writes, we rather keep the data in user-space buffer, and flush
+ * it only when it grows. however for larger writes, we prefer to flush
+ * any pre-existing buffer, and write the new one directly without reallocs
+ * and memory copying. */
+ if (len > PROTO_IOBUF_LEN) {
+ /* First, flush any pre-existing buffered data. */
+ if (sdslen(r->io.fd.buf)) {
+ if (rioFdWrite(r, NULL, 0) == 0)
+ return 0;
+ }
+ /* Write the new data, keeping 'p' and 'len' from the input. */
+ } else {
+ if (len) {
+ r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
+ if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
+ doflush = 1;
+ if (!doflush)
+ return 1;
+ }
+ /* Flusing the buffered data. set 'p' and 'len' accordintly. */
+ p = (unsigned char*) r->io.fd.buf;
+ len = sdslen(r->io.fd.buf);
}
- /* Write in little chunchs so that when there are big writes we
- * parallelize while the kernel is sending data in background to
- * the TCP socket. */
- while(len) {
- size_t count = len < 1024 ? len : 1024;
- int broken = 0;
- for (j = 0; j < r->io.connset.numconns; j++) {
- if (r->io.connset.state[j] != 0) {
- /* Skip FDs alraedy in error. */
- broken++;
- continue;
- }
-
- /* Make sure to write 'count' bytes to the socket regardless
- * of short writes. */
- size_t nwritten = 0;
- while(nwritten != count) {
- retval = connWrite(r->io.connset.conns[j],p+nwritten,count-nwritten);
- if (retval <= 0) {
- /* With blocking sockets, which is the sole user of this
- * rio target, EWOULDBLOCK is returned only because of
- * the SO_SNDTIMEO socket option, so we translate the error
- * into one more recognizable by the user. */
- if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
- break;
- }
- nwritten += retval;
- }
-
- if (nwritten != count) {
- /* Mark this FD as broken. */
- r->io.connset.state[j] = errno;
- if (r->io.connset.state[j] == 0) r->io.connset.state[j] = EIO;
- }
+ size_t nwritten = 0;
+ while(nwritten != len) {
+ retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
+ if (retval <= 0) {
+ /* With blocking io, which is the sole user of this
+ * rio target, EWOULDBLOCK is returned only because of
+ * the SO_SNDTIMEO socket option, so we translate the error
+ * into one more recognizable by the user. */
+ if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
+ return 0; /* error. */
}
- if (broken == r->io.connset.numconns) return 0; /* All the FDs in error. */
- p += count;
- len -= count;
- r->io.connset.pos += count;
+ nwritten += retval;
}
- if (doflush) sdsclear(r->io.connset.buf);
+ r->io.fd.pos += len;
+ sdsclear(r->io.fd.buf);
return 1;
}
/* Returns 1 or 0 for success/failure. */
-static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
+static size_t rioFdRead(rio *r, void *buf, size_t len) {
UNUSED(r);
UNUSED(buf);
UNUSED(len);
@@ -358,23 +340,23 @@ static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
}
/* Returns read/write position in file. */
-static off_t rioFdsetTell(rio *r) {
- return r->io.connset.pos;
+static off_t rioFdTell(rio *r) {
+ return r->io.fd.pos;
}
/* Flushes any buffer to target device if applicable. Returns 1 on success
* and 0 on failures. */
-static int rioFdsetFlush(rio *r) {
+static int rioFdFlush(rio *r) {
/* Our flush is implemented by the write method, that recognizes a
* buffer set to NULL with a count of zero as a flush request. */
- return rioFdsetWrite(r,NULL,0);
+ return rioFdWrite(r,NULL,0);
}
-static const rio rioFdsetIO = {
- rioFdsetRead,
- rioFdsetWrite,
- rioFdsetTell,
- rioFdsetFlush,
+static const rio rioFdIO = {
+ rioFdRead,
+ rioFdWrite,
+ rioFdTell,
+ rioFdFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* flags */
@@ -383,24 +365,16 @@ static const rio rioFdsetIO = {
{ { NULL, 0 } } /* union for io-specific vars */
};
-void rioInitWithConnset(rio *r, connection **conns, int numconns) {
- int j;
-
- *r = rioFdsetIO;
- r->io.connset.conns = zmalloc(sizeof(connection *)*numconns);
- r->io.connset.state = zmalloc(sizeof(int)*numconns);
- memcpy(r->io.connset.conns,conns,sizeof(connection *)*numconns);
- for (j = 0; j < numconns; j++) r->io.connset.state[j] = 0;
- r->io.connset.numconns = numconns;
- r->io.connset.pos = 0;
- r->io.connset.buf = sdsempty();
+void rioInitWithFd(rio *r, int fd) {
+ *r = rioFdIO;
+ r->io.fd.fd = fd;
+ r->io.fd.pos = 0;
+ r->io.fd.buf = sdsempty();
}
/* release the rio stream. */
-void rioFreeConnset(rio *r) {
- zfree(r->io.connset.conns);
- zfree(r->io.connset.state);
- sdsfree(r->io.connset.buf);
+void rioFreeFd(rio *r) {
+ sdsfree(r->io.fd.buf);
}
/* ---------------------------- Generic functions ---------------------------- */