summaryrefslogtreecommitdiff
path: root/src/rio.c
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2019-07-01 15:22:29 +0300
committerOran Agra <oran@redislabs.com>2019-07-08 15:37:48 +0300
commit2de544cfcc6d1aa7cf6d0c75a6116f7fc27b6fd6 (patch)
tree57d2e27d61fd4b5b369a7a3dfb7b8d7ce8d7ad74 /src/rio.c
parent722446510faf0debf0d309708b2ed4fe4d939319 (diff)
downloadredis-2de544cfcc6d1aa7cf6d0c75a6116f7fc27b6fd6.tar.gz
diskless replication on slave side (don't store rdb to file), plus some other related fixes
The implementation of the diskless replication was currently diskless only on the master side. The slave side was still storing the received rdb file to the disk before loading it back in and parsing it. This commit adds two modes to load rdb directly from socket: 1) when-empty 2) using "swapdb" the third mode of using diskless slave by flushdb is risky and currently not included. other changes: -------------- distinguish between aof configuration and state so that we can re-enable aof only when sync eventually succeeds (and not when exiting from readSyncBulkPayload after a failed attempt) also a CONFIG GET and INFO during rdb loading would have lied When loading rdb from the network, don't kill the server on short read (that can be a network error) Fix rdb check when performed on preamble AOF tests: run replication tests for diskless slave too make replication test a bit more aggressive Add test for diskless load swapdb
Diffstat (limited to 'src/rio.c')
-rw-r--r--src/rio.c109
1 files changed, 108 insertions, 1 deletions
diff --git a/src/rio.c b/src/rio.c
index c9c76b8f2..993768b56 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -157,6 +157,113 @@ void rioInitWithFile(rio *r, FILE *fp) {
r->io.file.autosync = 0;
}
+/* ------------------- File descriptor implementation ------------------- */
+
+static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
+ UNUSED(r);
+ UNUSED(buf);
+ UNUSED(len);
+ return 0; /* Error, this target does not yet support writing. */
+}
+
+/* Returns 1 or 0 for success/failure. */
+static size_t rioFdRead(rio *r, void *buf, size_t len) {
+ size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos;
+
+ /* if the buffer is too small for the entire request: realloc */
+ if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len)
+ r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf));
+
+ /* if the remaining unused buffer is not large enough: memmove so that we can read the rest */
+ if (len > avail && sdsavail(r->io.fd.buf) < len - avail) {
+ sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
+ r->io.fd.pos = 0;
+ }
+
+ /* if we don't already have all the data in the sds, read more */
+ while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) {
+ size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos;
+ size_t toread = len - buffered;
+ /* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */
+ if (toread < PROTO_IOBUF_LEN)
+ toread = PROTO_IOBUF_LEN;
+ if (toread > sdsavail(r->io.fd.buf))
+ toread = sdsavail(r->io.fd.buf);
+ if (r->io.fd.read_limit != 0 &&
+ r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) {
+ if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered)
+ toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered;
+ else {
+ errno = EOVERFLOW;
+ return 0;
+ }
+ }
+ int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread);
+ if (retval <= 0) {
+ if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
+ return 0;
+ }
+ sdsIncrLen(r->io.fd.buf, retval);
+ }
+
+ memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len);
+ r->io.fd.read_so_far += len;
+ r->io.fd.pos += len;
+ return len;
+}
+
+/* Returns read/write position in file. */
+static off_t rioFdTell(rio *r) {
+ return r->io.fd.read_so_far;
+}
+
+/* Flushes any buffer to target device if applicable. Returns 1 on success
+ * and 0 on failures. */
+static int rioFdFlush(rio *r) {
+ /* Our flush is implemented by the write method, that recognizes a
+ * buffer set to NULL with a count of zero as a flush request. */
+ return rioFdWrite(r,NULL,0);
+}
+
+static const rio rioFdIO = {
+ rioFdRead,
+ rioFdWrite,
+ rioFdTell,
+ rioFdFlush,
+ NULL, /* update_checksum */
+ 0, /* current checksum */
+ 0, /* bytes read or written */
+ 0, /* read/write chunk size */
+ { { NULL, 0 } } /* union for io-specific vars */
+};
+
+/* create an rio that implements a buffered read from an fd
+ * read_limit argument stops buffering when the reaching the limit */
+void rioInitWithFd(rio *r, int fd, size_t read_limit) {
+ *r = rioFdIO;
+ r->io.fd.fd = fd;
+ r->io.fd.pos = 0;
+ r->io.fd.read_limit = read_limit;
+ r->io.fd.read_so_far = 0;
+ r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
+ sdsclear(r->io.fd.buf);
+}
+
+/* release the rio stream.
+ * optionally returns the unread buffered data. */
+void rioFreeFd(rio *r, sds* out_remainingBufferedData) {
+ if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) {
+ if (r->io.fd.pos > 0)
+ sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
+ *out_remainingBufferedData = r->io.fd.buf;
+ } else {
+ sdsfree(r->io.fd.buf);
+ if (out_remainingBufferedData)
+ *out_remainingBufferedData = NULL;
+ }
+ r->io.fd.buf = NULL;
+}
+
/* ------------------- File descriptors set implementation ------------------- */
/* Returns 1 or 0 for success/failure.
@@ -300,7 +407,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
* disk I/O concentrated in very little time. When we fsync in an explicit
* way instead the I/O pressure is more distributed across time. */
void rioSetAutoSync(rio *r, off_t bytes) {
- serverAssert(r->read == rioFileIO.read);
+ if(r->write != rioFileIO.write) return;
r->io.file.autosync = bytes;
}