diff options
author | Oran Agra <oran@redislabs.com> | 2019-07-01 15:22:29 +0300 |
---|---|---|
committer | Oran Agra <oran@redislabs.com> | 2019-07-08 15:37:48 +0300 |
commit | 2de544cfcc6d1aa7cf6d0c75a6116f7fc27b6fd6 (patch) | |
tree | 57d2e27d61fd4b5b369a7a3dfb7b8d7ce8d7ad74 /src/rio.c | |
parent | 722446510faf0debf0d309708b2ed4fe4d939319 (diff) | |
download | redis-2de544cfcc6d1aa7cf6d0c75a6116f7fc27b6fd6.tar.gz |
diskless replication on slave side (don't store rdb to file), plus some other related fixes
The implementation of the diskless replication was currently diskless only on the master side.
The slave side was still storing the received rdb file to the disk before loading it back in and parsing it.
This commit adds two modes to load rdb directly from socket:
1) when-empty
2) using "swapdb"
the third mode of using diskless slave by flushdb is risky and currently not included.
other changes:
--------------
distinguish between aof configuration and state so that we can re-enable aof only when sync eventually
succeeds (and not when exiting from readSyncBulkPayload after a failed attempt)
also a CONFIG GET and INFO during rdb loading would have lied
When loading rdb from the network, don't kill the server on short read (that can be a network error)
Fix rdb check when performed on preamble AOF
tests:
run replication tests for diskless slave too
make replication test a bit more aggressive
Add test for diskless load swapdb
Diffstat (limited to 'src/rio.c')
-rw-r--r-- | src/rio.c | 109 |
1 files changed, 108 insertions, 1 deletions
@@ -157,6 +157,113 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } +/* ------------------- File descriptor implementation ------------------- */ + +static size_t rioFdWrite(rio *r, const void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not yet support writing. */ +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdRead(rio *r, void *buf, size_t len) { + size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos; + + /* if the buffer is too small for the entire request: realloc */ + if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len) + r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf)); + + /* if the remaining unused buffer is not large enough: memmove so that we can read the rest */ + if (len > avail && sdsavail(r->io.fd.buf) < len - avail) { + sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + r->io.fd.pos = 0; + } + + /* if we don't already have all the data in the sds, read more */ + while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) { + size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos; + size_t toread = len - buffered; + /* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */ + if (toread < PROTO_IOBUF_LEN) + toread = PROTO_IOBUF_LEN; + if (toread > sdsavail(r->io.fd.buf)) + toread = sdsavail(r->io.fd.buf); + if (r->io.fd.read_limit != 0 && + r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) { + if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered) + toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered; + else { + errno = EOVERFLOW; + return 0; + } + } + int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread); + if (retval <= 0) { + if (errno == EWOULDBLOCK) errno = ETIMEDOUT; + return 0; + } + sdsIncrLen(r->io.fd.buf, retval); + } + + memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len); + r->io.fd.read_so_far += len; + r->io.fd.pos += len; + return len; +} + +/* Returns read/write position in file. */ +static off_t rioFdTell(rio *r) { + return r->io.fd.read_so_far; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdWrite(r,NULL,0); +} + +static const rio rioFdIO = { + rioFdRead, + rioFdWrite, + rioFdTell, + rioFdFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +/* create an rio that implements a buffered read from an fd + * read_limit argument stops buffering when the reaching the limit */ +void rioInitWithFd(rio *r, int fd, size_t read_limit) { + *r = rioFdIO; + r->io.fd.fd = fd; + r->io.fd.pos = 0; + r->io.fd.read_limit = read_limit; + r->io.fd.read_so_far = 0; + r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(r->io.fd.buf); +} + +/* release the rio stream. + * optionally returns the unread buffered data. */ +void rioFreeFd(rio *r, sds* out_remainingBufferedData) { + if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) { + if (r->io.fd.pos > 0) + sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + *out_remainingBufferedData = r->io.fd.buf; + } else { + sdsfree(r->io.fd.buf); + if (out_remainingBufferedData) + *out_remainingBufferedData = NULL; + } + r->io.fd.buf = NULL; +} + /* ------------------- File descriptors set implementation ------------------- */ /* Returns 1 or 0 for success/failure. @@ -300,7 +407,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { * disk I/O concentrated in very little time. When we fsync in an explicit * way instead the I/O pressure is more distributed across time. */ void rioSetAutoSync(rio *r, off_t bytes) { - serverAssert(r->read == rioFileIO.read); + if(r->write != rioFileIO.write) return; r->io.file.autosync = bytes; } |