diff options
author | antirez <antirez@gmail.com> | 2014-10-17 16:45:48 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2014-10-17 16:45:53 +0200 |
commit | 525c488f639672e5b36cc67ede857d25a39c016d (patch) | |
tree | d222e7acb3d79f63c8aea5660b5c42c8c0c6a7c8 /src | |
parent | 74f90c61232859f35db4eabf5b0bf1c8e4123bf0 (diff) | |
download | redis-525c488f639672e5b36cc67ede857d25a39c016d.tar.gz |
rio fdset target: handle short writes.
While the socket is set in blocking mode, we still can get short writes
writing to a socket.
Diffstat (limited to 'src')
-rw-r--r-- | src/rdb.c | 6 | ||||
-rw-r--r-- | src/replication.c | 1 | ||||
-rw-r--r-- | src/rio.c | 13 |
3 files changed, 18 insertions, 2 deletions
@@ -1347,6 +1347,8 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { redisLog(REDIS_WARNING, "Slave %llu correctly received the streamed RDB file.", slave->id); + /* Restore the socket as non-blocking. */ + anetNonBlock(NULL,slave->fd); } } } @@ -1408,6 +1410,10 @@ int rdbSaveToSlavesSockets(void) { clientids[numfds] = slave->id; fds[numfds++] = slave->fd; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + /* Put the socket in non-blocking mode to simplify RDB transfer. + * We'll restore it when the children returns (since duped socket + * will share the O_NONBLOCK attribute with the parent). */ + anetBlock(NULL,slave->fd); } } diff --git a/src/replication.c b/src/replication.c index fa5ed87d7..ea8265e38 100644 --- a/src/replication.c +++ b/src/replication.c @@ -888,6 +888,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } nread = read(fd,buf,readlen); + printf("NREAD %d (%d)\n", (int)nread, (int)readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); @@ -197,8 +197,17 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { broken++; continue; } - retval = write(r->io.fdset.fds[j],p,count); - if (retval != count) { + + /* Make sure to write 'count' bytes to the socket regardless + * of short writes. */ + size_t nwritten = 0; + while(nwritten != count) { + retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten); + if (retval <= 0) break; + nwritten += retval; + } + + if (nwritten != count) { /* Mark this FD as broken. */ r->io.fdset.state[j] = errno; if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO; |