diff options
author | Oran Agra <oran@redislabs.com> | 2019-08-11 16:07:53 +0300 |
---|---|---|
committer | Yossi Gottlieb <yossigo@gmail.com> | 2019-10-07 21:06:30 +0300 |
commit | 5a477946065bcf05b335ededd6b794e82882ab73 (patch) | |
tree | 53dee2990f0d86e042f979322d998c25ade4879d /src/rio.c | |
parent | b087dd1db60ed23d9e59304deb0b1599437f6e23 (diff) | |
download | redis-5a477946065bcf05b335ededd6b794e82882ab73.tar.gz |
diskless replication rdb transfer uses pipe, and writes to sockets form the parent process.
misc:
- handle SSL_has_pending by iterating though these in beforeSleep, and setting timeout of 0 to aeProcessEvents
- fix issue with epoll signaling EPOLLHUP and EPOLLERR only to the write handlers. (needed to detect the rdb pipe was closed)
- add key-load-delay config for testing
- trim connShutdown which is no longer needed
- rioFdsetWrite -> rioFdWrite - simplified since there's no longer need to write to multiple FDs
- don't detect rdb child exited (don't call wait3) until we detect the pipe is closed
- Cleanup bad optimization from rio.c, add another one
Diffstat (limited to 'src/rio.c')
-rw-r--r-- | src/rio.c | 142 |
1 files changed, 58 insertions, 84 deletions
@@ -272,85 +272,67 @@ void rioFreeConn(rio *r, sds *remaining) { r->io.conn.buf = NULL; } -/* ------------------- File descriptors set implementation ------------------ - * This target is used to write the RDB file to N different replicas via - * sockets, when the master just streams the data to the replicas without - * creating an RDB on-disk image (diskless replication option). +/* ------------------- File descriptor implementation ------------------ + * This target is used to write the RDB file to pipe, when the master just + * streams the data to the replicas without creating an RDB on-disk image + * (diskless replication option). * It only implements writes. */ /* Returns 1 or 0 for success/failure. - * The function returns success as long as we are able to correctly write - * to at least one file descriptor. * * When buf is NULL and len is 0, the function performs a flush operation * if there is some pending buffer, so this function is also used in order - * to implement rioFdsetFlush(). */ -static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { + * to implement rioFdFlush(). */ +static size_t rioFdWrite(rio *r, const void *buf, size_t len) { ssize_t retval; - int j; unsigned char *p = (unsigned char*) buf; int doflush = (buf == NULL && len == 0); - /* To start we always append to our buffer. If it gets larger than - * a given size, we actually write to the sockets. */ - if (len) { - r->io.connset.buf = sdscatlen(r->io.connset.buf,buf,len); - len = 0; /* Prevent entering the while below if we don't flush. */ - if (sdslen(r->io.connset.buf) > PROTO_IOBUF_LEN) doflush = 1; - } - - if (doflush) { - p = (unsigned char*) r->io.connset.buf; - len = sdslen(r->io.connset.buf); + /* For small writes, we rather keep the data in user-space buffer, and flush + * it only when it grows. however for larger writes, we prefer to flush + * any pre-existing buffer, and write the new one directly without reallocs + * and memory copying. */ + if (len > PROTO_IOBUF_LEN) { + /* First, flush any pre-existing buffered data. */ + if (sdslen(r->io.fd.buf)) { + if (rioFdWrite(r, NULL, 0) == 0) + return 0; + } + /* Write the new data, keeping 'p' and 'len' from the input. */ + } else { + if (len) { + r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len); + if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) + doflush = 1; + if (!doflush) + return 1; + } + /* Flusing the buffered data. set 'p' and 'len' accordintly. */ + p = (unsigned char*) r->io.fd.buf; + len = sdslen(r->io.fd.buf); } - /* Write in little chunchs so that when there are big writes we - * parallelize while the kernel is sending data in background to - * the TCP socket. */ - while(len) { - size_t count = len < 1024 ? len : 1024; - int broken = 0; - for (j = 0; j < r->io.connset.numconns; j++) { - if (r->io.connset.state[j] != 0) { - /* Skip FDs alraedy in error. */ - broken++; - continue; - } - - /* Make sure to write 'count' bytes to the socket regardless - * of short writes. */ - size_t nwritten = 0; - while(nwritten != count) { - retval = connWrite(r->io.connset.conns[j],p+nwritten,count-nwritten); - if (retval <= 0) { - /* With blocking sockets, which is the sole user of this - * rio target, EWOULDBLOCK is returned only because of - * the SO_SNDTIMEO socket option, so we translate the error - * into one more recognizable by the user. */ - if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; - break; - } - nwritten += retval; - } - - if (nwritten != count) { - /* Mark this FD as broken. */ - r->io.connset.state[j] = errno; - if (r->io.connset.state[j] == 0) r->io.connset.state[j] = EIO; - } + size_t nwritten = 0; + while(nwritten != len) { + retval = write(r->io.fd.fd,p+nwritten,len-nwritten); + if (retval <= 0) { + /* With blocking io, which is the sole user of this + * rio target, EWOULDBLOCK is returned only because of + * the SO_SNDTIMEO socket option, so we translate the error + * into one more recognizable by the user. */ + if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; + return 0; /* error. */ } - if (broken == r->io.connset.numconns) return 0; /* All the FDs in error. */ - p += count; - len -= count; - r->io.connset.pos += count; + nwritten += retval; } - if (doflush) sdsclear(r->io.connset.buf); + r->io.fd.pos += len; + sdsclear(r->io.fd.buf); return 1; } /* Returns 1 or 0 for success/failure. */ -static size_t rioFdsetRead(rio *r, void *buf, size_t len) { +static size_t rioFdRead(rio *r, void *buf, size_t len) { UNUSED(r); UNUSED(buf); UNUSED(len); @@ -358,23 +340,23 @@ static size_t rioFdsetRead(rio *r, void *buf, size_t len) { } /* Returns read/write position in file. */ -static off_t rioFdsetTell(rio *r) { - return r->io.connset.pos; +static off_t rioFdTell(rio *r) { + return r->io.fd.pos; } /* Flushes any buffer to target device if applicable. Returns 1 on success * and 0 on failures. */ -static int rioFdsetFlush(rio *r) { +static int rioFdFlush(rio *r) { /* Our flush is implemented by the write method, that recognizes a * buffer set to NULL with a count of zero as a flush request. */ - return rioFdsetWrite(r,NULL,0); + return rioFdWrite(r,NULL,0); } -static const rio rioFdsetIO = { - rioFdsetRead, - rioFdsetWrite, - rioFdsetTell, - rioFdsetFlush, +static const rio rioFdIO = { + rioFdRead, + rioFdWrite, + rioFdTell, + rioFdFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ @@ -383,24 +365,16 @@ static const rio rioFdsetIO = { { { NULL, 0 } } /* union for io-specific vars */ }; -void rioInitWithConnset(rio *r, connection **conns, int numconns) { - int j; - - *r = rioFdsetIO; - r->io.connset.conns = zmalloc(sizeof(connection *)*numconns); - r->io.connset.state = zmalloc(sizeof(int)*numconns); - memcpy(r->io.connset.conns,conns,sizeof(connection *)*numconns); - for (j = 0; j < numconns; j++) r->io.connset.state[j] = 0; - r->io.connset.numconns = numconns; - r->io.connset.pos = 0; - r->io.connset.buf = sdsempty(); +void rioInitWithFd(rio *r, int fd) { + *r = rioFdIO; + r->io.fd.fd = fd; + r->io.fd.pos = 0; + r->io.fd.buf = sdsempty(); } /* release the rio stream. */ -void rioFreeConnset(rio *r) { - zfree(r->io.connset.conns); - zfree(r->io.connset.state); - sdsfree(r->io.connset.buf); +void rioFreeFd(rio *r) { + sdsfree(r->io.fd.buf); } /* ---------------------------- Generic functions ---------------------------- */ |