summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2014-10-17 16:45:48 +0200
committerantirez <antirez@gmail.com>2014-10-17 16:45:53 +0200
commit525c488f639672e5b36cc67ede857d25a39c016d (patch)
treed222e7acb3d79f63c8aea5660b5c42c8c0c6a7c8
parent74f90c61232859f35db4eabf5b0bf1c8e4123bf0 (diff)
downloadredis-525c488f639672e5b36cc67ede857d25a39c016d.tar.gz
rio fdset target: handle short writes.
While the socket is set in blocking mode, we still can get short writes writing to a socket.
-rw-r--r--src/rdb.c6
-rw-r--r--src/replication.c1
-rw-r--r--src/rio.c13
3 files changed, 18 insertions, 2 deletions
diff --git a/src/rdb.c b/src/rdb.c
index c6a1ec691..b8e02b021 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1347,6 +1347,8 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
redisLog(REDIS_WARNING,
"Slave %llu correctly received the streamed RDB file.",
slave->id);
+ /* Restore the socket as non-blocking. */
+ anetNonBlock(NULL,slave->fd);
}
}
}
@@ -1408,6 +1410,10 @@ int rdbSaveToSlavesSockets(void) {
clientids[numfds] = slave->id;
fds[numfds++] = slave->fd;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
+ /* Put the socket in non-blocking mode to simplify RDB transfer.
+ * We'll restore it when the children returns (since duped socket
+ * will share the O_NONBLOCK attribute with the parent). */
+ anetBlock(NULL,slave->fd);
}
}
diff --git a/src/replication.c b/src/replication.c
index fa5ed87d7..ea8265e38 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -888,6 +888,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
nread = read(fd,buf,readlen);
+ printf("NREAD %d (%d)\n", (int)nread, (int)readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
diff --git a/src/rio.c b/src/rio.c
index 5153ed28e..3513e1889 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -197,8 +197,17 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
broken++;
continue;
}
- retval = write(r->io.fdset.fds[j],p,count);
- if (retval != count) {
+
+ /* Make sure to write 'count' bytes to the socket regardless
+ * of short writes. */
+ size_t nwritten = 0;
+ while(nwritten != count) {
+ retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
+ if (retval <= 0) break;
+ nwritten += retval;
+ }
+
+ if (nwritten != count) {
/* Mark this FD as broken. */
r->io.fdset.state[j] = errno;
if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;