summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorchenyang8094 <chenyang8094@users.noreply.github.com>2022-01-04 01:14:13 +0800
committerGitHub <noreply@github.com>2022-01-03 19:14:13 +0200
commit87789fae0b767e47fef78ee994434554f2bf2f31 (patch)
tree55609bc8e86dee0bd9eceb8b83e321154383220c /src
parent78a62c012438a8ae23161f6cfef8159e3a254524 (diff)
downloadredis-87789fae0b767e47fef78ee994434554f2bf2f31.tar.gz
Implement Multi Part AOF mechanism to avoid AOFRW overheads. (#9788)
Implement Multi-Part AOF mechanism to avoid overheads during AOFRW. Introducing a folder with multiple AOF files tracked by a manifest file. The main issues with the the original AOFRW mechanism are: * buffering of commands that are processed during rewrite (consuming a lot of RAM) * freezes of the main process when the AOFRW completes to drain the remaining part of the buffer and fsync it. * double disk IO for the data that arrives during AOFRW (had to be written to both the old and new AOF files) The main modifications of this PR: 1. Remove the AOF rewrite buffer and related code. 2. Divide the AOF into multiple files, they are classified as two types, one is the the `BASE` type, it represents the full amount of data (Maybe AOF or RDB format) after each AOFRW, there is only one `BASE` file at most. The second is `INCR` type, may have more than one. They represent the incremental commands since the last AOFRW. 3. Use a AOF manifest file to record and manage these AOF files mentioned above. 4. The original configuration of `appendfilename` will be the base part of the new file name, for example: `appendonly.aof.1.base.rdb` and `appendonly.aof.2.incr.aof` 5. Add manifest-related TCL tests, and modified some existing tests that depend on the `appendfilename` 6. Remove the `aof_rewrite_buffer_length` field in info. 7. Add `aof-disable-auto-gc` configuration. By default we're automatically deleting HISTORY type AOFs. It also gives users the opportunity to preserve the history AOFs. just for testing use now. 8. Add AOFRW limiting measure. When the AOFRW failures reaches the threshold (3 times now), we will delay the execution of the next AOFRW by 1 minute. If the next AOFRW also fails, it will be delayed by 2 minutes. The next is 4, 8, 16, the maximum delay is 60 minutes (1 hour). During the limit period, we can still use the 'bgrewriteaof' command to execute AOFRW immediately. 9. Support upgrade (load) data from old version redis. 10. Add `appenddirname` configuration, as the directory name of the append only files. All AOF files and manifest file will be placed in this directory. 11. Only the last AOF file (BASE or INCR) can be truncated. Otherwise redis will exit even if `aof-load-truncated` is enabled. Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src')
-rw-r--r--src/aof.c1492
-rw-r--r--src/config.c29
-rw-r--r--src/debug.c5
-rw-r--r--src/evict.c2
-rw-r--r--src/latency.c2
-rw-r--r--src/object.c1
-rw-r--r--src/rdb.c43
-rw-r--r--src/redis-check-rdb.c7
-rw-r--r--src/server.c38
-rw-r--r--src/server.h65
-rw-r--r--src/util.c80
-rw-r--r--src/util.h5
12 files changed, 1199 insertions, 570 deletions
diff --git a/src/aof.c b/src/aof.c
index 764b5e0cf..eba247544 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -41,189 +41,775 @@
#include <sys/wait.h>
#include <sys/param.h>
-void aofUpdateCurrentSize(void);
-void aofClosePipes(void);
void freeClientArgv(client *c);
+off_t getAppendOnlyFileSize(sds filename);
+off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am);
+int getBaseAndIncrAppendOnlyFilesNum(aofManifest *am);
+int aofFileExist(char *filename);
/* ----------------------------------------------------------------------------
- * AOF rewrite buffer implementation.
+ * AOF Manifest file implementation.
*
- * The following code implement a simple buffer used in order to accumulate
- * changes while the background process is rewriting the AOF file.
+ * The following code implements the read/write logic of AOF manifest file, which
+ * is used to track and manage all AOF files.
*
- * We only need to append, but can't just use realloc with a large block
- * because 'huge' reallocs are not always handled as one could expect
- * (via remapping of pages at OS level) but may involve copying data.
+ * Append-only files consist of three types:
*
- * For this reason we use a list of blocks, every block is
- * AOF_RW_BUF_BLOCK_SIZE bytes.
+ * BASE: Represents a Redis snapshot from the time of last AOF rewrite. The manifest
+ * file contains at most a single BASE file, which will always be the first file in the
+ * list.
+ *
+ * INCR: Represents all write commands executed by Redis following the last successful
+ * AOF rewrite. In some cases it is possible to have several ordered INCR files. For
+ * example:
+ * - During an on-going AOF rewrite
+ * - After an AOF rewrite was aborted/failed, and before the next one succeeded.
+ *
+ * HISTORY: After a successful rewrite, the previous BASE and INCR become HISTORY files.
+ * They will be automatically removed unless garbage collection is disabled.
+ *
+ * The following is a possible AOF manifest file content:
+ *
+ * file appendonly.aof.2.base.rdb seq 2 type b
+ * file appendonly.aof.1.incr.aof seq 1 type h
+ * file appendonly.aof.2.incr.aof seq 2 type h
+ * file appendonly.aof.3.incr.aof seq 3 type h
+ * file appendonly.aof.4.incr.aof seq 4 type i
+ * file appendonly.aof.5.incr.aof seq 5 type i
* ------------------------------------------------------------------------- */
-#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */
-
-typedef struct aofrwblock {
- unsigned long used, free, pos;
- /* Note that 'buf' must be the last field of aofrwblock struct, because
- * memory allocator may give us more memory than our apply for reducing
- * fragments, but we want to make full use of given memory, i.e. we may
- * access the memory after 'buf'. To avoid make others fields corrupt,
- * 'buf' must be the last one. */
- char buf[AOF_RW_BUF_BLOCK_SIZE];
-} aofrwblock;
-
-/* This function free the old AOF rewrite buffer if needed, and initialize
- * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
- * so can be used for the first initialization as well. */
-void aofRewriteBufferReset(void) {
- if (server.aof_rewrite_buf_blocks)
- listRelease(server.aof_rewrite_buf_blocks);
-
- server.aof_rewrite_buf_blocks = listCreate();
- listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
+/* Naming rules. */
+#define BASE_FILE_SUFFIX ".base"
+#define INCR_FILE_SUFFIX ".incr"
+#define RDB_FORMAT_SUFFIX ".rdb"
+#define AOF_FORMAT_SUFFIX ".aof"
+#define MANIFEST_NAME_SUFFIX ".manifest"
+#define MANIFEST_TEMP_NAME_PREFIX "temp_"
+
+/* AOF manifest key. */
+#define AOF_MANIFEST_KEY_FILE_NAME "file"
+#define AOF_MANIFEST_KEY_FILE_SEQ "seq"
+#define AOF_MANIFEST_KEY_FILE_TYPE "type"
+
+/* Create an empty aofInfo. */
+aofInfo *aofInfoCreate(void) {
+ return zcalloc(sizeof(aofInfo));
+}
+
+/* Free the aofInfo structure (pointed to by ai) and its embedded file_name. */
+void aofInfoFree(aofInfo *ai) {
+ serverAssert(ai != NULL);
+ if (ai->file_name) sdsfree(ai->file_name);
+ zfree(ai);
+}
+
+/* Deep copy an aofInfo. */
+aofInfo *aofInfoDup(aofInfo *orig) {
+ serverAssert(orig != NULL);
+ aofInfo *ai = aofInfoCreate();
+ ai->file_name = sdsdup(orig->file_name);
+ ai->file_seq = orig->file_seq;
+ ai->file_type = orig->file_type;
+ return ai;
+}
+
+/* Method to free AOF list elements. */
+void aofListFree(void *item) {
+ aofInfo *ai = (aofInfo *)item;
+ aofInfoFree(ai);
+}
+
+/* Method to duplicate AOF list elements. */
+void *aofListDup(void *item) {
+ return aofInfoDup(item);
}
-/* Return the current size of the AOF rewrite buffer. */
-unsigned long aofRewriteBufferSize(void) {
+/* Create an empty aofManifest, which will be called in `aofLoadManifestFromDisk`. */
+aofManifest *aofManifestCreate(void) {
+ aofManifest *am = zcalloc(sizeof(aofManifest));
+ am->incr_aof_list = listCreate();
+ am->history_aof_list = listCreate();
+ listSetFreeMethod(am->incr_aof_list, aofListFree);
+ listSetDupMethod(am->incr_aof_list, aofListDup);
+ listSetFreeMethod(am->history_aof_list, aofListFree);
+ listSetDupMethod(am->history_aof_list, aofListDup);
+ return am;
+}
+
+/* Free the aofManifest structure (pointed to by am) and its embedded members. */
+void aofManifestFree(aofManifest *am) {
+ if (am->base_aof_info) aofInfoFree(am->base_aof_info);
+ if (am->incr_aof_list) listRelease(am->incr_aof_list);
+ if (am->history_aof_list) listRelease(am->history_aof_list);
+ zfree(am);
+}
+
+sds getAofManifestFileName() {
+ return sdscatprintf(sdsempty(), "%s%s", server.aof_filename,
+ MANIFEST_NAME_SUFFIX);
+}
+
+sds getTempAofManifestFileName() {
+ return sdscatprintf(sdsempty(), "%s%s%s", MANIFEST_TEMP_NAME_PREFIX,
+ server.aof_filename, MANIFEST_NAME_SUFFIX);
+}
+
+/* Returns the string representation of aofManifest pointed to by am.
+ *
+ * The string is multiple lines separated by '\n', and each line represents
+ * an AOF file.
+ *
+ * Each line is space delimited and contains 6 fields, as follows:
+ * "file" [filename] "seq" [sequence] "type" [type]
+ *
+ * Where "file", "seq" and "type" are keywords that describe the next value,
+ * [filename] and [sequence] describe file name and order, and [type] is one
+ * of 'b' (base), 'h' (history) or 'i' (incr).
+ *
+ * The base file, if exists, will always be first, followed by history files,
+ * and incremental files.
+ */
+#define AOF_INFO_FORMAT_AND_CAT(buf, info) \
+ sdscatprintf((buf), "%s %s %s %lld %s %c\n", \
+ AOF_MANIFEST_KEY_FILE_NAME, (info)->file_name, \
+ AOF_MANIFEST_KEY_FILE_SEQ, (info)->file_seq, \
+ AOF_MANIFEST_KEY_FILE_TYPE, (info)->file_type)
+
+sds getAofManifestAsString(aofManifest *am) {
+ serverAssert(am != NULL);
+
+ sds buf = sdsempty();
listNode *ln;
listIter li;
- unsigned long size = 0;
- listRewind(server.aof_rewrite_buf_blocks,&li);
- while((ln = listNext(&li))) {
- aofrwblock *block = listNodeValue(ln);
- size += block->used;
+ /* 1. Add BASE File information, it is always at the beginning
+ * of the manifest file. */
+ if (am->base_aof_info) {
+ buf = AOF_INFO_FORMAT_AND_CAT(buf, am->base_aof_info);
}
- return size;
-}
-/* This function is different from aofRewriteBufferSize, to get memory usage,
- * we should also count all other fields(except 'buf') of aofrwblock and the
- * last block's free size. */
-unsigned long aofRewriteBufferMemoryUsage(void) {
- unsigned long size = aofRewriteBufferSize();
+ /* 2. Add HISTORY type AOF information. */
+ listRewind(am->history_aof_list, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ aofInfo *ai = (aofInfo*)ln->value;
+ buf = AOF_INFO_FORMAT_AND_CAT(buf, ai);
+ }
- listNode *ln = listLast(server.aof_rewrite_buf_blocks);
- if (ln != NULL) {
- aofrwblock *block = listNodeValue(ln);
- size += block->free;
- size += (offsetof(aofrwblock,buf) *
- listLength(server.aof_rewrite_buf_blocks));
+ /* 3. Add INCR type AOF information. */
+ listRewind(am->incr_aof_list, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ aofInfo *ai = (aofInfo*)ln->value;
+ buf = AOF_INFO_FORMAT_AND_CAT(buf, ai);
}
- return size;
+
+ return buf;
}
-/* Event handler used to send data to the child process doing the AOF
- * rewrite. We send pieces of our AOF differences buffer so that the final
- * write when the child finishes the rewrite will be small. */
-void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
- listNode *ln;
- aofrwblock *block;
- ssize_t nwritten;
- mstime_t latency;
- UNUSED(el);
- UNUSED(fd);
- UNUSED(privdata);
- UNUSED(mask);
+/* Load the manifest information from the disk to `server.aof_manifest`
+ * when the Redis server start.
+ *
+ * During loading, this function does strict error checking and will abort
+ * the entire Redis server process on error (I/O error, invalid format, etc.)
+ *
+ * If the AOF directory or manifest file do not exist, this will be ignored
+ * in order to support seamless upgrades from previous versions which did not
+ * use them.
+ */
+#define MANIFEST_MAX_LINE 1024
+void aofLoadManifestFromDisk(void) {
+ const char *err = NULL;
+ long long maxseq = 0;
- latencyStartMonitor(latency);
- while(1) {
- ln = listFirst(server.aof_rewrite_buf_blocks);
- block = ln ? ln->value : NULL;
- if (server.aof_stop_sending_diff || !block) {
- aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
- AE_WRITABLE);
- break;
+ server.aof_manifest = aofManifestCreate();
+
+ if (!dirExists(server.aof_dirname)) {
+ serverLog(LL_NOTICE, "The AOF directory %s doesn't exist", server.aof_dirname);
+ return;
+ }
+
+ sds am_name = getAofManifestFileName();
+ sds am_filepath = makePath(server.aof_dirname, am_name);
+ if (!fileExist(am_filepath)) {
+ serverLog(LL_NOTICE, "The AOF manifest file %s doesn't exist", am_name);
+ sdsfree(am_name);
+ sdsfree(am_filepath);
+ return;
+ }
+
+ FILE *fp = fopen(am_filepath, "r");
+ if (fp == NULL) {
+ serverLog(LL_WARNING, "Fatal error: can't open the AOF manifest "
+ "file %s for reading: %s", am_name, strerror(errno));
+ exit(1);
+ }
+
+ sdsfree(am_name);
+ sdsfree(am_filepath);
+
+ char buf[MANIFEST_MAX_LINE+1];
+ sds *argv = NULL;
+ int argc;
+ aofInfo *ai = NULL;
+
+ sds line = NULL;
+ int linenum = 0;
+
+ while (1) {
+ if (fgets(buf, MANIFEST_MAX_LINE+1, fp) == NULL) {
+ if (feof(fp)) {
+ if (linenum == 0) {
+ err = "Found an empty AOF manifest";
+ goto loaderr;
+ } else {
+ break;
+ }
+ } else {
+ err = "Read AOF manifest failed";
+ goto loaderr;
+ }
}
- if (block->used != block->pos) {
- nwritten = write(server.aof_pipe_write_data_to_child,
- block->buf+block->pos,block->used-block->pos);
- if (nwritten <= 0) break;
- block->pos += nwritten;
+
+ linenum++;
+
+ /* Skip comments lines */
+ if (buf[0] == '#') continue;
+
+ if (strchr(buf, '\n') == NULL) {
+ err = "The AOF manifest file contains too long line";
+ goto loaderr;
}
- if (block->used == block->pos) listDelNode(server.aof_rewrite_buf_blocks,ln);
+
+ line = sdstrim(sdsnew(buf), " \t\r\n");
+ argv = sdssplitargs(line, &argc);
+ /* 'argc < 6' was done for forward compatibility. */
+ if (argv == NULL || argc < 6 || (argc % 2)) {
+ err = "The AOF manifest file is invalid format";
+ goto loaderr;
+ }
+
+ ai = aofInfoCreate();
+ for (int i = 0; i < argc; i += 2) {
+ if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_NAME)) {
+ ai->file_name = sdsnew(argv[i+1]);
+ if (!pathIsBaseName(ai->file_name)) {
+ err = "File can't be a path, just a filename";
+ goto loaderr;
+ }
+ } else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_SEQ)) {
+ ai->file_seq = atoll(argv[i+1]);
+ } else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_TYPE)) {
+ ai->file_type = (argv[i+1])[0];
+ }
+ /* else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_OTHER)) {} */
+ }
+
+ /* We have to make sure we load all the information. */
+ if (!ai->file_name || !ai->file_seq || !ai->file_type) {
+ err = "Mismatched manifest key";
+ goto loaderr;
+ }
+
+ sdsfreesplitres(argv, argc);
+ argv = NULL;
+
+ if (ai->file_type == AOF_FILE_TYPE_BASE) {
+ if (server.aof_manifest->base_aof_info) {
+ err = "Found duplicate base file information";
+ goto loaderr;
+ }
+ server.aof_manifest->base_aof_info = ai;
+ server.aof_manifest->curr_base_file_seq = ai->file_seq;
+ } else if (ai->file_type == AOF_FILE_TYPE_HIST) {
+ listAddNodeTail(server.aof_manifest->history_aof_list, ai);
+ } else if (ai->file_type == AOF_FILE_TYPE_INCR) {
+ if (ai->file_seq <= maxseq) {
+ err = "Found a non-monotonic sequence number";
+ goto loaderr;
+ }
+ listAddNodeTail(server.aof_manifest->incr_aof_list, ai);
+ server.aof_manifest->curr_incr_file_seq = ai->file_seq;
+ maxseq = ai->file_seq;
+ } else {
+ err = "Unknown AOF file type";
+ goto loaderr;
+ }
+
+ sdsfree(line);
+ line = NULL;
+ ai = NULL;
}
- latencyEndMonitor(latency);
- latencyAddSampleIfNeeded("aof-rewrite-write-data-to-child",latency);
+
+ fclose(fp);
+ return;
+
+loaderr:
+ /* Sanitizer suppression: may report a false positive if we goto loaderr
+ * and exit(1) without freeing these allocations. */
+ if (argv) sdsfreesplitres(argv, argc);
+ if (ai) aofInfoFree(ai);
+
+ serverLog(LL_WARNING, "\n*** FATAL AOF MANIFEST FILE ERROR ***\n");
+ if (line) {
+ serverLog(LL_WARNING, "Reading the manifest file, at line %d\n", linenum);
+ serverLog(LL_WARNING, ">>> '%s'\n", line);
+ }
+ serverLog(LL_WARNING, "%s\n", err);
+ exit(1);
+}
+
+/* Deep copy an aofManifest from orig.
+ *
+ * In `backgroundRewriteDoneHandler` and `openNewIncrAofForAppend`, we will
+ * first deep copy a temporary AOF manifest from the `server.aof_manifest` and
+ * try to modify it. Once everything is modified, we will atomically make the
+ * `server.aof_manifest` point to this temporary aof_manifest.
+ */
+aofManifest *aofManifestDup(aofManifest *orig) {
+ serverAssert(orig != NULL);
+ aofManifest *am = zcalloc(sizeof(aofManifest));
+
+ am->curr_base_file_seq = orig->curr_base_file_seq;
+ am->curr_incr_file_seq = orig->curr_incr_file_seq;
+ am->dirty = orig->dirty;
+
+ if (orig->base_aof_info) {
+ am->base_aof_info = aofInfoDup(orig->base_aof_info);
+ }
+
+ am->incr_aof_list = listDup(orig->incr_aof_list);
+ am->history_aof_list = listDup(orig->history_aof_list);
+ serverAssert(am->incr_aof_list != NULL);
+ serverAssert(am->history_aof_list != NULL);
+ return am;
+}
+
+/* Change the `server.aof_manifest` pointer to 'am' and free the previous
+ * one if we have. */
+void aofManifestFreeAndUpdate(aofManifest *am) {
+ serverAssert(am != NULL);
+ if (server.aof_manifest) aofManifestFree(server.aof_manifest);
+ server.aof_manifest = am;
+}
+
+/* Called in `backgroundRewriteDoneHandler` to get a new BASE file
+ * name, and mark the previous (if we have) BASE file as HISTORY type.
+ *
+ * BASE file naming rules: `server.aof_filename`.seq.base.format
+ *
+ * for example:
+ * appendonly.aof.1.base.aof (server.aof_use_rdb_preamble is no)
+ * appendonly.aof.1.base.rdb (server.aof_use_rdb_preamble is yes)
+ */
+sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) {
+ serverAssert(am != NULL);
+ if (am->base_aof_info) {
+ serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
+ am->base_aof_info->file_type = AOF_FILE_TYPE_HIST;
+ listAddNodeHead(am->history_aof_list, am->base_aof_info);
+ }
+
+ char *format_suffix = server.aof_use_rdb_preamble ?
+ RDB_FORMAT_SUFFIX:AOF_FORMAT_SUFFIX;
+
+ aofInfo *ai = aofInfoCreate();
+ ai->file_name = sdscatprintf(sdsempty(), "%s.%lld%s%s", server.aof_filename,
+ ++am->curr_base_file_seq, BASE_FILE_SUFFIX, format_suffix);
+ ai->file_seq = am->curr_base_file_seq;
+ ai->file_type = AOF_FILE_TYPE_BASE;
+ am->base_aof_info = ai;
+ am->dirty = 1;
+ return am->base_aof_info->file_name;
}
-/* Append data to the AOF rewrite buffer, allocating new blocks if needed.
+/* Get a new INCR type AOF name.
*
- * Sanitizer suppression: zmalloc_usable() confuses sanitizer, it generates
- * a false positive out-of-bounds error */
-REDIS_NO_SANITIZE("bounds")
-void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
- listNode *ln = listLast(server.aof_rewrite_buf_blocks);
- aofrwblock *block = ln ? ln->value : NULL;
+ * INCR AOF naming rules: `server.aof_filename`.seq.incr.aof
+ *
+ * for example:
+ * appendonly.aof.1.incr.aof
+ */
+sds getNewIncrAofName(aofManifest *am) {
+ aofInfo *ai = aofInfoCreate();
+ ai->file_type = AOF_FILE_TYPE_INCR;
+ ai->file_name = sdscatprintf(sdsempty(), "%s.%lld%s%s", server.aof_filename,
+ ++am->curr_incr_file_seq, INCR_FILE_SUFFIX, AOF_FORMAT_SUFFIX);
+ ai->file_seq = am->curr_incr_file_seq;
+ listAddNodeTail(am->incr_aof_list, ai);
+ am->dirty = 1;
+ return ai->file_name;
+}
+/* Get the last INCR AOF name or create a new one. */
+sds getLastIncrAofName(aofManifest *am) {
+ serverAssert(am != NULL);
+
+ /* If 'incr_aof_list' is empty, just create a new one. */
+ if (!listLength(am->incr_aof_list)) {
+ return getNewIncrAofName(am);
+ }
+
+ /* Or return the last one. */
+ listNode *lastnode = listIndex(am->incr_aof_list, -1);
+ aofInfo *ai = listNodeValue(lastnode);
+ return ai->file_name;
+}
+
+/* Called in `backgroundRewriteDoneHandler`. when AOFRW success, This
+ * function will change the AOF file type in 'incr_aof_list' from
+ * AOF_FILE_TYPE_INCR to AOF_FILE_TYPE_HIST, and move them to the
+ * 'history_aof_list'.
+ */
+void markRewrittenIncrAofAsHistory(aofManifest *am) {
+ serverAssert(am != NULL);
+ if (!listLength(am->incr_aof_list)) {
+ return;
+ }
+
+ listNode *ln;
+ listIter li;
+
+ listRewindTail(am->incr_aof_list, &li);
+
+ /* "server.aof_fd != -1" means AOF enabled, then we must skip the
+ * last AOF, because this file is our currently writing. */
+ if (server.aof_fd != -1) {
+ ln = listNext(&li);
+ serverAssert(ln != NULL);
+ }
+
+ /* Move aofInfo from 'incr_aof_list' to 'history_aof_list'. */
+ while ((ln = listNext(&li)) != NULL) {
+ aofInfo *ai = (aofInfo*)ln->value;
+ serverAssert(ai->file_type == AOF_FILE_TYPE_INCR);
+
+ aofInfo *hai = aofInfoDup(ai);
+ hai->file_type = AOF_FILE_TYPE_HIST;
+ listAddNodeHead(am->history_aof_list, hai);
+ listDelNode(am->incr_aof_list, ln);
+ }
+
+ am->dirty = 1;
+}
+
+/* Write the formatted manifest string to disk. */
+int writeAofManifestFile(sds buf) {
+ int ret = C_OK;
+ ssize_t nwritten;
+ int len;
+
+ sds am_name = getAofManifestFileName();
+ sds am_filepath = makePath(server.aof_dirname, am_name);
+ sds tmp_am_name = getTempAofManifestFileName();
+ sds tmp_am_filepath = makePath(server.aof_dirname, tmp_am_name);
+
+ int fd = open(tmp_am_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
+ if (fd == -1) {
+ serverLog(LL_WARNING, "Can't open the AOF manifest file %s: %s",
+ tmp_am_name, strerror(errno));
+
+ ret = C_ERR;
+ goto cleanup;
+ }
+
+ len = sdslen(buf);
while(len) {
- /* If we already got at least an allocated block, try appending
- * at least some piece into it. */
- if (block) {
- unsigned long thislen = (block->free < len) ? block->free : len;
- if (thislen) { /* The current block is not already full. */
- memcpy(block->buf+block->used, s, thislen);
- block->used += thislen;
- block->free -= thislen;
- s += thislen;
- len -= thislen;
- }
- }
+ nwritten = write(fd, buf, len);
- if (len) { /* First block to allocate, or need another block. */
- int numblocks;
- size_t usable_size;
-
- block = zmalloc_usable(sizeof(*block), &usable_size);
- block->free = usable_size-offsetof(aofrwblock,buf);
- block->used = 0;
- block->pos = 0;
- listAddNodeTail(server.aof_rewrite_buf_blocks,block);
-
- /* Log every time we cross more 10 or 100 blocks, respectively
- * as a notice or warning. */
- numblocks = listLength(server.aof_rewrite_buf_blocks);
- if (((numblocks+1) % 10) == 0) {
- int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
- LL_NOTICE;
- serverLog(level,"Background AOF buffer size: %lu MB",
- aofRewriteBufferSize()/(1024*1024));
- }
+ if (nwritten < 0) {
+ if (errno == EINTR) continue;
+
+ serverLog(LL_WARNING, "Error trying to write the temporary AOF manifest file %s: %s",
+ tmp_am_name, strerror(errno));
+
+ ret = C_ERR;
+ goto cleanup;
}
+
+ len -= nwritten;
+ buf += nwritten;
}
- /* Install a file event to send data to the rewrite child if there is
- * not one already. */
- if (!server.aof_stop_sending_diff &&
- aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0)
- {
- aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
- AE_WRITABLE, aofChildWriteDiffData, NULL);
+ if (redis_fsync(fd) == -1) {
+ serverLog(LL_WARNING, "Fail to fsync the temp AOF file %s: %s.",
+ tmp_am_name, strerror(errno));
+
+ ret = C_ERR;
+ goto cleanup;
+ }
+
+ if (rename(tmp_am_filepath, am_filepath) != 0) {
+ serverLog(LL_WARNING,
+ "Error trying to rename the temporary AOF manifest file %s into %s: %s",
+ tmp_am_name, am_name, strerror(errno));
+
+ ret = C_ERR;
}
+
+cleanup:
+ if (fd != -1) close(fd);
+ sdsfree(am_name);
+ sdsfree(am_filepath);
+ sdsfree(tmp_am_name);
+ sdsfree(tmp_am_filepath);
+ return ret;
}
-/* Write the buffer (possibly composed of multiple blocks) into the specified
- * fd. If a short write or any other error happens -1 is returned,
- * otherwise the number of bytes written is returned. */
-ssize_t aofRewriteBufferWrite(int fd) {
+/* Persist the aofManifest information pointed to by am to disk. */
+int persistAofManifest(aofManifest *am) {
+ if (am->dirty == 0) {
+ return C_OK;
+ }
+
+ sds amstr = getAofManifestAsString(am);
+ int ret = writeAofManifestFile(amstr);
+ sdsfree(amstr);
+ if (ret == C_OK) am->dirty = 0;
+ return ret;
+}
+
+/* Called in `loadAppendOnlyFiles` when we upgrade from a old version redis.
+ *
+ * 1) Create AOF directory use 'server.aof_dirname' as the name.
+ * 2) Use 'server.aof_filename' to construct a BASE type aofInfo and add it to
+ * aofManifest, then persist the manifest file to AOF directory.
+ * 3) Move the old AOF file (server.aof_filename) to AOF directory.
+ *
+ * If any of the above steps fails or crash occurs, this will not cause any
+ * problems, and redis will retry the upgrade process when it restarts.
+ */
+void aofUpgradePrepare(aofManifest *am) {
+ serverAssert(!aofFileExist(server.aof_filename));
+
+ /* Create AOF directory use 'server.aof_dirname' as the name. */
+ if (dirCreateIfMissing(server.aof_dirname) == -1) {
+ serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s",
+ server.aof_dirname, strerror(errno));
+ exit(1);
+ }
+
+ /* Manually construct a BASE type aofInfo and add it to aofManifest. */
+ if (am->base_aof_info) aofInfoFree(am->base_aof_info);
+ aofInfo *ai = aofInfoCreate();
+ ai->file_name = sdsnew(server.aof_filename);
+ ai->file_seq = 1;
+ ai->file_type = AOF_FILE_TYPE_BASE;
+ am->base_aof_info = ai;
+ am->curr_base_file_seq = 1;
+ am->dirty = 1;
+
+ /* Persist the manifest file to AOF directory. */
+ if (persistAofManifest(am) != C_OK) {
+ exit(1);
+ }
+
+ /* Move the old AOF file to AOF directory. */
+ sds aof_filepath = makePath(server.aof_dirname, server.aof_filename);
+ if (rename(server.aof_filename, aof_filepath) == -1) {
+ serverLog(LL_WARNING,
+ "Error trying to move the old AOF file %s into dir %s: %s",
+ server.aof_filename,
+ server.aof_dirname,
+ strerror(errno));
+ sdsfree(aof_filepath);
+ exit(1);;
+ }
+ sdsfree(aof_filepath);
+
+ serverLog(LL_NOTICE, "Successfully migrated an old-style AOF file (%s) into the AOF directory (%s).",
+ server.aof_filename, server.aof_dirname);
+}
+
+/* When AOFRW success, the previous BASE and INCR AOFs will
+ * become HISTORY type and be moved into 'history_aof_list'.
+ *
+ * The function will traverse the 'history_aof_list' and submit
+ * the delete task to the bio thread.
+ */
+int aofDelHistoryFiles(void) {
+ if (server.aof_manifest == NULL ||
+ server.aof_disable_auto_gc == 1 ||
+ !listLength(server.aof_manifest->history_aof_list))
+ {
+ return C_OK;
+ }
+
listNode *ln;
listIter li;
- ssize_t count = 0;
-
- listRewind(server.aof_rewrite_buf_blocks,&li);
- while((ln = listNext(&li))) {
- aofrwblock *block = listNodeValue(ln);
- ssize_t nwritten;
-
- if (block->used != block->pos) {
- nwritten = write(fd,block->buf+block->pos,block->used-block->pos);
- if (nwritten != (ssize_t)(block->used-block->pos)) {
- if (nwritten == 0) errno = EIO;
- return -1;
+
+ listRewind(server.aof_manifest->history_aof_list, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ aofInfo *ai = (aofInfo*)ln->value;
+ serverAssert(ai->file_type == AOF_FILE_TYPE_HIST);
+ serverLog(LL_NOTICE, "Removing the history file %s in the background", ai->file_name);
+ sds aof_filepath = makePath(server.aof_dirname, ai->file_name);
+ bg_unlink(aof_filepath);
+ sdsfree(aof_filepath);
+ listDelNode(server.aof_manifest->history_aof_list, ln);
+ }
+
+ server.aof_manifest->dirty = 1;
+ return persistAofManifest(server.aof_manifest);
+}
+
+/* Called after `loadDataFromDisk` when redis start. If `server.aof_state` is
+ * 'AOF_ON', It will do two things:
+ * 1. Open the last opened INCR type AOF for writing, If not, create a new one
+ * 2. Synchronously update the manifest file to the disk
+ *
+ * If any of the above two steps fails, the redis process will exit.
+ */
+void aofOpenIfNeededOnServerStart(void) {
+ if (server.aof_state != AOF_ON) {
+ return;
+ }
+
+ serverAssert(server.aof_manifest != NULL);
+ serverAssert(server.aof_fd == -1);
+
+ if (dirCreateIfMissing(server.aof_dirname) == -1) {
+ serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s",
+ server.aof_dirname, strerror(errno));
+ exit(1);
+ }
+
+ /* Because we will 'exit(1)' if open AOF or persistent manifest fails, so
+ * we don't need atomic modification here. */
+ sds aof_name = getLastIncrAofName(server.aof_manifest);
+
+ /* Here we should use 'O_APPEND' flag. */
+ sds aof_filepath = makePath(server.aof_dirname, aof_name);
+ server.aof_fd = open(aof_filepath, O_WRONLY|O_APPEND|O_CREAT, 0644);
+ sdsfree(aof_filepath);
+ if (server.aof_fd == -1) {
+ serverLog(LL_WARNING, "Can't open the append-only file %s: %s",
+ aof_name, strerror(errno));
+ exit(1);
+ }
+
+ int ret = persistAofManifest(server.aof_manifest);
+ if (ret != C_OK) {
+ exit(1);
+ }
+
+ server.aof_last_incr_size = getAppendOnlyFileSize(aof_name);
+}
+
+int aofFileExist(char *filename) {
+ sds file_path = makePath(server.aof_dirname, filename);
+ int ret = fileExist(file_path);
+ sdsfree(file_path);
+ return ret;
+}
+
+/* Called in `rewriteAppendOnlyFileBackground`. If `server.aof_state`
+ * is 'AOF_ON' or ‘AOF_WAIT_REWRITE', It will do two things:
+ * 1. Open a new INCR type AOF for writing
+ * 2. Synchronously update the manifest file to the disk
+ *
+ * The above two steps of modification are atomic, that is, if
+ * any step fails, the entire operation will rollback and returns
+ * C_ERR, and if all succeeds, it returns C_OK.
+ * */
+int openNewIncrAofForAppend(void) {
+ serverAssert(server.aof_manifest != NULL);
+ int newfd;
+
+ /* Only open new INCR AOF when AOF enabled. */
+ if (server.aof_state == AOF_OFF) return C_OK;
+
+ /* Dup a temp aof_manifest to modify. */
+ aofManifest *temp_am = aofManifestDup(server.aof_manifest);
+
+ /* Open new AOF. */
+ sds new_aof_name = getNewIncrAofName(temp_am);
+ sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name);
+ newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
+ sdsfree(new_aof_filepath);
+ if (newfd == -1) {
+ serverLog(LL_WARNING, "Can't open the append-only file %s: %s",
+ new_aof_name, strerror(errno));
+
+ aofManifestFree(temp_am);
+ return C_ERR;
+ }
+
+ /* Persist AOF Manifest. */
+ int ret = persistAofManifest(temp_am);
+ if (ret == C_ERR) {
+ close(newfd);
+ aofManifestFree(temp_am);
+ return C_ERR;
+ }
+
+ /* If reaches here, we can safely modify the `server.aof_manifest`
+ * and `server.aof_fd`. */
+
+ /* Close old aof_fd if needed. */
+ if (server.aof_fd != -1) bioCreateCloseJob(server.aof_fd);
+ server.aof_fd = newfd;
+
+ /* Reset the aof_last_incr_size. */
+ server.aof_last_incr_size = 0;
+ /* Update `server.aof_manifest`. */
+ aofManifestFreeAndUpdate(temp_am);
+ return C_OK;
+}
+
+/* Whether to limit the execution of Background AOF rewrite.
+ *
+ * At present, if AOFRW fails, redis will automatically retry. If it continues
+ * to fail, we may get a lot of very small INCR files. so we need an AOFRW
+ * limiting measure.
+ *
+ * We can't directly use `server.aof_current_size` and `server.aof_last_incr_size`,
+ * because there may be no new writes after AOFRW fails.
+ *
+ * So, we use time delay to achieve our goal. When AOFRW fails, we delay the execution
+ * of the next AOFRW by 1 minute. If the next AOFRW also fails, it will be delayed by 2
+ * minutes. The next is 4, 8, 16, the maximum delay is 60 minutes (1 hour).
+ *
+ * During the limit period, we can still use the 'bgrewriteaof' command to execute AOFRW
+ * immediately.
+ *
+ * Return 1 means that AOFRW is limited and cannot be executed. 0 means that we can execute
+ * AOFRW, which may be that we have reached the 'next_rewrite_time' or the number of INCR
+ * AOFs has not reached the limit threshold.
+ * */
+#define AOF_REWRITE_LIMITE_THRESHOLD 3
+#define AOF_REWRITE_LIMITE_NAX_MINUTES 60 /* 1 hour */
+int aofRewriteLimited(void) {
+ int limit = 0;
+ static int limit_deley_minutes = 0;
+ static time_t next_rewrite_time = 0;
+
+ unsigned long incr_aof_num = listLength(server.aof_manifest->incr_aof_list);
+ if (incr_aof_num >= AOF_REWRITE_LIMITE_THRESHOLD) {
+ if (server.unixtime < next_rewrite_time) {
+ limit = 1;
+ } else {
+ if (limit_deley_minutes == 0) {
+ limit = 1;
+ limit_deley_minutes = 1;
+ } else {
+ limit_deley_minutes *= 2;
}
- count += nwritten;
+
+ if (limit_deley_minutes > AOF_REWRITE_LIMITE_NAX_MINUTES) {
+ limit_deley_minutes = AOF_REWRITE_LIMITE_NAX_MINUTES;
+ }
+
+ next_rewrite_time = server.unixtime + limit_deley_minutes * 60;
+
+ serverLog(LL_WARNING,
+ "Background AOF rewrite has repeatedly failed %ld times and triggered the limit, will retry in %d minutes",
+ incr_aof_num, limit_deley_minutes);
}
+ } else {
+ limit_deley_minutes = 0;
+ next_rewrite_time = 0;
}
- return count;
+
+ return limit;
}
/* ----------------------------------------------------------------------------
@@ -253,13 +839,9 @@ void killAppendOnlyChild(void) {
if (kill(server.child_pid,SIGUSR1) != -1) {
while(waitpid(-1, &statloc, 0) != server.child_pid);
}
- /* Reset the buffer accumulating changes while the child saves. */
- aofRewriteBufferReset();
aofRemoveTempFile(server.child_pid);
resetChildState();
server.aof_rewrite_time_start = -1;
- /* Close pipes used for IPC between the two processes. */
- aofClosePipes();
}
/* Called when the user switches from "appendonly yes" to "appendonly no"
@@ -279,6 +861,7 @@ void stopAppendOnly(void) {
server.aof_selected_db = -1;
server.aof_state = AOF_OFF;
server.aof_rewrite_scheduled = 0;
+ server.aof_last_incr_size = 0;
killAppendOnlyChild();
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
@@ -287,23 +870,9 @@ void stopAppendOnly(void) {
/* Called when the user switches from "appendonly no" to "appendonly yes"
* at runtime using the CONFIG command. */
int startAppendOnly(void) {
- char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
- int newfd;
-
- newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
serverAssert(server.aof_state == AOF_OFF);
- if (newfd == -1) {
- char *str_err = strerror(errno);
- char *cwdp = getcwd(cwd,MAXPATHLEN);
- serverLog(LL_WARNING,
- "Redis needs to enable the AOF but can't open the "
- "append only file %s (in server root dir %s): %s",
- server.aof_filename,
- cwdp ? cwdp : "unknown",
- str_err);
- return C_ERR;
- }
+ server.aof_state = AOF_WAIT_REWRITE;
if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) {
server.aof_rewrite_scheduled = 1;
serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
@@ -315,18 +884,14 @@ int startAppendOnly(void) {
serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
killAppendOnlyChild();
}
+
if (rewriteAppendOnlyFileBackground() == C_ERR) {
- close(newfd);
+ server.aof_state = AOF_OFF;
serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
return C_ERR;
}
}
- /* We correctly switched on AOF, now wait for the rewrite to be complete
- * in order to append data on disk. */
- server.aof_state = AOF_WAIT_REWRITE;
server.aof_last_fsync = server.unixtime;
- server.aof_fd = newfd;
-
/* If AOF fsync error in bio job, we just ignore it and log the event. */
int aof_bio_fsync_status;
atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status);
@@ -490,7 +1055,7 @@ void flushAppendOnlyFile(int force) {
(long long)sdslen(server.aof_buf));
}
- if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
+ if (ftruncate(server.aof_fd, server.aof_last_incr_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
@@ -524,6 +1089,7 @@ void flushAppendOnlyFile(int force) {
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
+ server.aof_last_incr_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
@@ -538,6 +1104,7 @@ void flushAppendOnlyFile(int force) {
}
}
server.aof_current_size += nwritten;
+ server.aof_last_incr_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
@@ -657,15 +1224,11 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
- if (server.aof_state == AOF_ON)
- server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
-
- /* If a background append only file rewriting is in progress we want to
- * accumulate the differences between the child DB and the current one
- * in a buffer, so that when the child process will do its work we
- * can append the differences to the new append only file. */
- if (server.child_type == CHILD_TYPE_AOF)
- aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
+ if (server.aof_state == AOF_ON ||
+ server.child_type == CHILD_TYPE_AOF)
+ {
+ server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
+ }
sdsfree(buf);
}
@@ -698,41 +1261,40 @@ struct client *createAOFClient(void) {
return c;
}
-/* Replay the append log file. On success AOF_OK is returned,
+/* Replay an append log file. On success AOF_OK or AOF_TRUNCATED is returned,
* otherwise, one of the following is returned:
* AOF_OPEN_ERR: Failed to open the AOF file.
* AOF_NOT_EXIST: AOF file doesn't exist.
* AOF_EMPTY: The AOF file is empty (nothing to load).
* AOF_FAILED: Failed to load the AOF file. */
-int loadAppendOnlyFile(char *filename) {
+int loadSingleAppendOnlyFile(char *filename) {
struct client *fakeClient;
- FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */
- int ret;
+ off_t last_progress_report_size = 0;
+ int ret = C_OK;
+ sds aof_filepath = makePath(server.aof_dirname, filename);
+ FILE *fp = fopen(aof_filepath, "r");
if (fp == NULL) {
int en = errno;
- if (redis_stat(filename, &sb) == 0) {
- serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(en));
+ if (redis_stat(aof_filepath, &sb) == 0 || errno != ENOENT) {
+ serverLog(LL_WARNING,"Fatal error: can't open the append log file %s for reading: %s", filename, strerror(en));
+ sdsfree(aof_filepath);
return AOF_OPEN_ERR;
} else {
- serverLog(LL_WARNING,"The append log file doesn't exist: %s",strerror(errno));
+ serverLog(LL_WARNING,"The append log file %s doesn't exist: %s", filename, strerror(errno));
+ sdsfree(aof_filepath);
return AOF_NOT_EXIST;
}
}
- /* Handle a zero-length AOF file as a special case. An empty AOF file
- * is a valid AOF because an empty server with AOF enabled will create
- * a zero length file at startup, that will remain like that if no write
- * operation is received. */
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
- server.aof_current_size = 0;
- server.aof_fsync_offset = server.aof_current_size;
fclose(fp);
+ sdsfree(aof_filepath);
return AOF_EMPTY;
}
@@ -742,7 +1304,6 @@ int loadAppendOnlyFile(char *filename) {
client *old_client = server.current_client;
fakeClient = server.current_client = createAOFClient();
- startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */
@@ -757,11 +1318,12 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
-
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
- serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
+ serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename);
goto readerr;
} else {
+ loadingAbsProgress(ftello(fp));
+ last_progress_report_size = ftello(fp);
serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
}
}
@@ -776,17 +1338,19 @@ int loadAppendOnlyFile(char *filename) {
struct redisCommand *cmd;
/* Serve the clients from time to time */
- if (!(loops++ % 1000)) {
- loadingProgress(ftello(fp));
+ if (!(loops++ % 1024)) {
+ off_t progress_delta = ftello(fp) - last_progress_report_size;
+ loadingIncrProgress(progress_delta);
+ last_progress_report_size += progress_delta;
processEventsWhileBlocked();
processModuleLoadingProgressEvent(1);
}
-
if (fgets(buf,sizeof(buf),fp) == NULL) {
- if (feof(fp))
+ if (feof(fp)) {
break;
- else
+ } else {
goto readerr;
+ }
}
if (buf[0] == '#') continue; /* Skip annotations */
if (buf[0] != '*') goto fmterr;
@@ -837,8 +1401,8 @@ int loadAppendOnlyFile(char *filename) {
cmd = lookupCommand(argv,argc);
if (!cmd) {
serverLog(LL_WARNING,
- "Unknown command '%s' reading the append only file",
- (char*)argv[0]->ptr);
+ "Unknown command '%s' reading the append only file %s",
+ (char*)argv[0]->ptr, filename);
freeClientArgv(fakeClient);
ret = AOF_FAILED;
goto cleanup;
@@ -877,57 +1441,58 @@ int loadAppendOnlyFile(char *filename) {
* to remove the unprocessed tail and continue. */
if (fakeClient->flags & CLIENT_MULTI) {
serverLog(LL_WARNING,
- "Revert incomplete MULTI/EXEC transaction in AOF file");
+ "Revert incomplete MULTI/EXEC transaction in AOF file %s", filename);
valid_up_to = valid_before_multi;
goto uxeof;
}
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
+ loadingIncrProgress(ftello(fp) - last_progress_report_size);
server.aof_state = old_aof_state;
- aofUpdateCurrentSize();
- server.aof_rewrite_base_size = server.aof_current_size;
- server.aof_fsync_offset = server.aof_current_size;
- ret = AOF_OK;
goto cleanup;
readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
if (!feof(fp)) {
- serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
+ serverLog(LL_WARNING,"Unrecoverable error reading the append only file %s: %s", filename, strerror(errno));
ret = AOF_FAILED;
goto cleanup;
}
uxeof: /* Unexpected AOF end of file. */
if (server.aof_load_truncated) {
- serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");
- serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",
- (unsigned long long) valid_up_to);
- if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
+ serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file %s!!!", filename);
+ serverLog(LL_WARNING,"!!! Truncating the AOF %s at offset %llu !!!",
+ filename, (unsigned long long) valid_up_to);
+ if (valid_up_to == -1 || truncate(aof_filepath,valid_up_to) == -1) {
if (valid_up_to == -1) {
serverLog(LL_WARNING,"Last valid command offset is invalid");
} else {
- serverLog(LL_WARNING,"Error truncating the AOF file: %s",
- strerror(errno));
+ serverLog(LL_WARNING,"Error truncating the AOF file %s: %s",
+ filename, strerror(errno));
}
} else {
/* Make sure the AOF file descriptor points to the end of the
* file after the truncate call. */
if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
- serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",
- strerror(errno));
+ serverLog(LL_WARNING,"Can't seek the end of the AOF file %s: %s",
+ filename, strerror(errno));
} else {
serverLog(LL_WARNING,
- "AOF loaded anyway because aof-load-truncated is enabled");
+ "AOF %s loaded anyway because aof-load-truncated is enabled", filename);
+ ret = AOF_TRUNCATED;
goto loaded_ok;
}
}
}
- serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
+ serverLog(LL_WARNING,"Unexpected end of file reading the append only file %s. You can: \
+ 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. \
+ 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.", filename);
ret = AOF_FAILED;
goto cleanup;
fmterr: /* Format error. */
- serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
+ serverLog(LL_WARNING,"Bad file format reading the append only file %s: \
+ make a backup of your AOF file, then use ./redis-check-aof --fix <filename>", filename);
ret = AOF_FAILED;
/* fall through to cleanup. */
@@ -935,6 +1500,108 @@ cleanup:
if (fakeClient) freeClient(fakeClient);
server.current_client = old_client;
fclose(fp);
+ sdsfree(aof_filepath);
+ return ret;
+}
+
+/* Load the AOF files according the aofManifest pointed by am. */
+int loadAppendOnlyFiles(aofManifest *am) {
+ serverAssert(am != NULL);
+ int ret = C_OK;
+ long long start;
+ off_t total_size = 0;
+ sds aof_name;
+ int total_num, aof_num = 0, last_file;
+
+ /* If the 'server.aof_filename' file exists in dir, we may be starting
+ * from an old redis version. We will use enter upgrade mode in three situations.
+ *
+ * 1. If the 'server.aof_dirname' directory not exist
+ * 2. If the 'server.aof_dirname' directory exists but the manifest file is missing
+ * 3. If the 'server.aof_dirname' directory exists and the manifest file it contains
+ * has only one base AOF record, and the file name of this base AOF is 'server.aof_filename',
+ * and the 'server.aof_filename' file not exist in 'server.aof_dirname' directory
+ * */
+ if (fileExist(server.aof_filename)) {
+ if (!dirExists(server.aof_dirname) ||
+ (am->base_aof_info == NULL && listLength(am->incr_aof_list) == 0) ||
+ (am->base_aof_info != NULL && listLength(am->incr_aof_list) == 0 &&
+ !strcmp(am->base_aof_info->file_name, server.aof_filename) && !aofFileExist(server.aof_filename)))
+ {
+ aofUpgradePrepare(am);
+ }
+ }
+
+ if (am->base_aof_info == NULL && listLength(am->incr_aof_list) == 0) {
+ return AOF_NOT_EXIST;
+ }
+
+ total_num = getBaseAndIncrAppendOnlyFilesNum(am);
+ serverAssert(total_num > 0);
+
+ /* Here we calculate the total size of all BASE and INCR files in
+ * advance, it will be set to `server.loading_total_bytes`. */
+ total_size = getBaseAndIncrAppendOnlyFilesSize(server.aof_manifest);
+ startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0);
+
+ /* Load BASE AOF if needed. */
+ if (am->base_aof_info) {
+ serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
+ aof_name = (char*)am->base_aof_info->file_name;
+ updateLoadingFileName(aof_name);
+ last_file = ++aof_num == total_num;
+ start = ustime();
+ ret = loadSingleAppendOnlyFile(aof_name);
+ if (ret == AOF_OK || (ret == AOF_TRUNCATED && last_file)) {
+ serverLog(LL_NOTICE, "DB loaded from base file %s: %.3f seconds",
+ aof_name, (float)(ustime()-start)/1000000);
+ }
+
+ /* If an AOF exists in the manifest but not on the disk, Or the truncated
+ * file is not the last file, we consider this to be a fatal error. */
+ if (ret == AOF_NOT_EXIST || (ret == AOF_TRUNCATED && !last_file)) {
+ ret = AOF_FAILED;
+ }
+
+ if (ret == AOF_OPEN_ERR || ret == AOF_FAILED) {
+ goto cleanup;
+ }
+ }
+
+ /* Load INCR AOFs if needed. */
+ if (listLength(am->incr_aof_list)) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(am->incr_aof_list, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ aofInfo *ai = (aofInfo*)ln->value;
+ serverAssert(ai->file_type == AOF_FILE_TYPE_INCR);
+ aof_name = (char*)ai->file_name;
+ updateLoadingFileName(aof_name);
+ last_file = ++aof_num == total_num;
+ start = ustime();
+ ret = loadSingleAppendOnlyFile(aof_name);
+ if (ret == AOF_OK || (ret == AOF_TRUNCATED && last_file)) {
+ serverLog(LL_NOTICE, "DB loaded from incr file %s: %.3f seconds",
+ aof_name, (float)(ustime()-start)/1000000);
+ }
+
+ if (ret == AOF_NOT_EXIST || (ret == AOF_TRUNCATED && !last_file)) {
+ ret = AOF_FAILED;
+ }
+
+ if (ret == AOF_OPEN_ERR || ret == AOF_FAILED) {
+ goto cleanup;
+ }
+ }
+ }
+
+ server.aof_current_size = total_size;
+ server.aof_rewrite_base_size = server.aof_current_size;
+ server.aof_fsync_offset = server.aof_current_size;
+
+cleanup:
stopLoading(ret == AOF_OK);
return ret;
}
@@ -1402,25 +2069,9 @@ int rewriteModuleObject(rio *r, robj *key, robj *o, int dbid) {
return io.error ? 0 : 1;
}
-/* This function is called by the child rewriting the AOF file to read
- * the difference accumulated from the parent into a buffer, that is
- * concatenated at the end of the rewrite. */
-ssize_t aofReadDiffFromParent(void) {
- char buf[65536]; /* Default pipe buffer size on most Linux systems. */
- ssize_t nread, total = 0;
-
- while ((nread =
- read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
- server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
- total += nread;
- }
- return total;
-}
-
int rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
- size_t processed = 0;
int j;
long key_count = 0;
long long updated_time = 0;
@@ -1493,11 +2144,6 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}
- /* Read some diff from the parent process from time to time. */
- if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
- processed = aof->processed_bytes;
- aofReadDiffFromParent();
- }
/* Update info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
@@ -1535,7 +2181,6 @@ int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp = NULL;
char tmpfile[256];
- char byte;
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
@@ -1546,7 +2191,6 @@ int rewriteAppendOnlyFile(char *filename) {
return C_ERR;
}
- server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
@@ -1564,74 +2208,6 @@ int rewriteAppendOnlyFile(char *filename) {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
- /* Do an initial slow fsync here while the parent is still sending
- * data, in order to make the next final fsync faster. */
- if (fflush(fp) == EOF) goto werr;
- if (fsync(fileno(fp)) == -1) goto werr;
-
- /* Read again a few times to get more data from the parent.
- * We can't read forever (the server may receive data from clients
- * faster than it is able to send data to the child), so we try to read
- * some more data in a loop as soon as there is a good chance more data
- * will come. If it looks like we are wasting time, we abort (this
- * happens after 20 ms without new data). */
- int nodata = 0;
- mstime_t start = mstime();
- while(mstime()-start < 1000 && nodata < 20) {
- if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
- {
- nodata++;
- continue;
- }
- nodata = 0; /* Start counting from zero, we stop on N *contiguous*
- timeouts. */
- aofReadDiffFromParent();
- }
-
- /* Ask the master to stop sending diffs. */
- if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
- if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
- goto werr;
- /* We read the ACK from the server using a 5 seconds timeout. Normally
- * it should reply ASAP, but just in case we lose its reply, we are sure
- * the child will eventually get terminated. */
- if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
- byte != '!') goto werr;
- serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
-
- /* Read the final diff if any. */
- aofReadDiffFromParent();
-
- /* Write the received diff to the file. */
- serverLog(LL_NOTICE,
- "Concatenating %.2f MB of AOF diff received from parent.",
- (double) sdslen(server.aof_child_diff) / (1024*1024));
-
- /* Now we write the entire AOF buffer we received from the parent
- * via the pipe during the life of this fork child.
- * once a second, we'll take a break and send updated COW info to the parent */
- size_t bytes_to_write = sdslen(server.aof_child_diff);
- const char *buf = server.aof_child_diff;
- long long cow_updated_time = mstime();
- long long key_count = dbTotalServerKeyCount();
- while (bytes_to_write) {
- /* We write the AOF buffer in chunk of 8MB so that we can check the time in between them */
- size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20);
-
- if (rioWrite(&aof,buf,chunk_size) == 0)
- goto werr;
-
- bytes_to_write -= chunk_size;
- buf += chunk_size;
-
- /* Update COW info */
- long long now = mstime();
- if (now - cow_updated_time >= 1000) {
- sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");
- cow_updated_time = now;
- }
- }
-
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp)) goto werr;
if (fsync(fileno(fp))) goto werr;
@@ -1648,6 +2224,7 @@ int rewriteAppendOnlyFile(char *filename) {
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
stopSaving(1);
+
return C_OK;
werr:
@@ -1657,78 +2234,6 @@ werr:
stopSaving(0);
return C_ERR;
}
-
-/* ----------------------------------------------------------------------------
- * AOF rewrite pipes for IPC
- * -------------------------------------------------------------------------- */
-
-/* This event handler is called when the AOF rewriting child sends us a
- * single '!' char to signal we should stop sending buffer diffs. The
- * parent sends a '!' as well to acknowledge. */
-void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
- char byte;
- UNUSED(el);
- UNUSED(privdata);
- UNUSED(mask);
-
- if (read(fd,&byte,1) == 1 && byte == '!') {
- serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
- server.aof_stop_sending_diff = 1;
- if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
- /* If we can't send the ack, inform the user, but don't try again
- * since in the other side the children will use a timeout if the
- * kernel can't buffer our write, or, the children was
- * terminated. */
- serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
- strerror(errno));
- }
- }
- /* Remove the handler since this can be called only one time during a
- * rewrite. */
- aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
-}
-
-/* Create the pipes used for parent - child process IPC during rewrite.
- * We have a data pipe used to send AOF incremental diffs to the child,
- * and two other pipes used by the children to signal it finished with
- * the rewrite so no more data should be written, and another for the
- * parent to acknowledge it understood this new condition. */
-int aofCreatePipes(void) {
- int fds[6] = {-1, -1, -1, -1, -1, -1};
- int j;
-
- if (anetPipe(fds, O_NONBLOCK, O_NONBLOCK) == -1) goto error; /* parent -> children data, non blocking pipe */
- if (anetPipe(fds+2, 0, 0) == -1) goto error; /* children -> parent ack. */
- if (anetPipe(fds+4, 0, 0) == -1) goto error; /* parent -> children ack. */
- if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
-
- server.aof_pipe_write_data_to_child = fds[1];
- server.aof_pipe_read_data_from_parent = fds[0];
- server.aof_pipe_write_ack_to_parent = fds[3];
- server.aof_pipe_read_ack_from_child = fds[2];
- server.aof_pipe_write_ack_to_child = fds[5];
- server.aof_pipe_read_ack_from_parent = fds[4];
- server.aof_stop_sending_diff = 0;
- return C_OK;
-
-error:
- serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
- strerror(errno));
- for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
- return C_ERR;
-}
-
-void aofClosePipes(void) {
- aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
- aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
- close(server.aof_pipe_write_data_to_child);
- close(server.aof_pipe_read_data_from_parent);
- close(server.aof_pipe_write_ack_to_parent);
- close(server.aof_pipe_read_ack_from_child);
- close(server.aof_pipe_write_ack_to_child);
- close(server.aof_pipe_read_ack_from_parent);
-}
-
/* ----------------------------------------------------------------------------
* AOF background rewrite
* ------------------------------------------------------------------------- */
@@ -1738,18 +2243,34 @@ void aofClosePipes(void) {
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
- * 2b) the parent accumulates differences in server.aof_rewrite_buf.
+ * 2b) the parent open a new INCR AOF file to continue writing.
* 3) When the child finished '2a' exists.
- * 4) The parent will trap the exit code, if it's OK, will append the
- * data accumulated into server.aof_rewrite_buf into the temp file, and
- * finally will rename(2) the temp file in the actual file name.
- * The the new file is reopened as the new append only file. Profit!
+ * 4) The parent will trap the exit code, if it's OK, it will:
+ * 4a) get a new BASE file name and mark the previous (if we have) as the HISTORY type
+ * 4b) rename(2) the temp file in new BASE file name
+ * 4c) mark the rewritten INCR AOFs as history type
+ * 4d) persist AOF manifest file
+ * 4e) Delete the history files use bio
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
if (hasActiveChildProcess()) return C_ERR;
- if (aofCreatePipes() != C_OK) return C_ERR;
+
+ if (dirCreateIfMissing(server.aof_dirname) == -1) {
+ serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s",
+ server.aof_dirname, strerror(errno));
+ return C_ERR;
+ }
+
+ /* We set aof_selected_db to -1 in order to force the next call to the
+ * feedAppendOnlyFile() to issue a SELECT command, so the differences
+ * accumulated by the parent into server.aof_rewrite_buf will start
+ * with a SELECT statement and it will be safe to merge. */
+ server.aof_selected_db = -1;
+ flushAppendOnlyFile(1);
+ if (openNewIncrAofForAppend() != C_OK) return C_ERR;
+
if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
char tmpfile[256];
@@ -1770,19 +2291,12 @@ int rewriteAppendOnlyFileBackground(void) {
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
- aofClosePipes();
return C_ERR;
}
serverLog(LL_NOTICE,
"Background append only file rewriting started by pid %ld",(long) childpid);
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
-
- /* We set aof_selected_db to -1 in order to force the next call to the
- * feedAppendOnlyFile() to issue a SELECT command, so the differences
- * accumulated by the parent into server.aof_rewrite_buf will start
- * with a SELECT statement and it will be safe to merge. */
- server.aof_selected_db = -1;
return C_OK;
}
return C_OK; /* unreached */
@@ -1812,150 +2326,125 @@ void aofRemoveTempFile(pid_t childpid) {
bg_unlink(tmpfile);
}
-/* Update the server.aof_current_size field explicitly using stat(2)
- * to check the size of the file. This is useful after a rewrite or after
- * a restart, normally the size is updated just adding the write length
- * to the current length, that is much faster. */
-void aofUpdateCurrentSize(void) {
+off_t getAppendOnlyFileSize(sds filename) {
struct redis_stat sb;
+ off_t size;
mstime_t latency;
+ sds aof_filepath = makePath(server.aof_dirname, filename);
latencyStartMonitor(latency);
- if (redis_stat(server.aof_filename,&sb) == -1) {
- serverLog(LL_WARNING,"Unable to obtain the AOF file length. stat: %s",
- strerror(errno));
+ if (redis_stat(aof_filepath, &sb) == -1) {
+ serverLog(LL_WARNING, "Unable to obtain the AOF file %s length. stat: %s",
+ filename, strerror(errno));
+ size = 0;
} else {
- server.aof_current_size = sb.st_size;
+ size = sb.st_size;
}
latencyEndMonitor(latency);
- latencyAddSampleIfNeeded("aof-fstat",latency);
+ latencyAddSampleIfNeeded("aof-fstat", latency);
+ sdsfree(aof_filepath);
+ return size;
+}
+
+off_t getBaseAndIncrAppendOnlyFilesSize(aofManifest *am) {
+ off_t size = 0;
+
+ listNode *ln;
+ listIter li;
+
+ if (am->base_aof_info) {
+ serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
+ size += getAppendOnlyFileSize(am->base_aof_info->file_name);
+ }
+
+ listRewind(am->incr_aof_list, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ aofInfo *ai = (aofInfo*)ln->value;
+ serverAssert(ai->file_type == AOF_FILE_TYPE_INCR);
+ size += getAppendOnlyFileSize(ai->file_name);
+ }
+
+ return size;
+}
+
+int getBaseAndIncrAppendOnlyFilesNum(aofManifest *am) {
+ int num = 0;
+ if (am->base_aof_info) num++;
+ if (am->incr_aof_list) num += listLength(am->incr_aof_list);
+ return num;
}
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
- int newfd, oldfd;
char tmpfile[256];
long long now = ustime();
+ sds new_base_filename;
+ aofManifest *temp_am;
mstime_t latency;
serverLog(LL_NOTICE,
"Background AOF rewrite terminated with success");
- /* Flush the differences accumulated by the parent to the
- * rewritten AOF. */
- latencyStartMonitor(latency);
- snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
+ snprintf(tmpfile, 256, "temp-rewriteaof-bg-%d.aof",
(int)server.child_pid);
- newfd = open(tmpfile,O_WRONLY|O_APPEND);
- if (newfd == -1) {
- serverLog(LL_WARNING,
- "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
- goto cleanup;
- }
- if (aofRewriteBufferWrite(newfd) == -1) {
- serverLog(LL_WARNING,
- "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
- close(newfd);
- goto cleanup;
- }
- latencyEndMonitor(latency);
- latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
-
- if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {
- aof_background_fsync(newfd);
- } else if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
- latencyStartMonitor(latency);
- if (redis_fsync(newfd) == -1) {
- serverLog(LL_WARNING,
- "Error trying to fsync the parent diff to the rewritten AOF: %s", strerror(errno));
- close(newfd);
- goto cleanup;
- }
- latencyEndMonitor(latency);
- latencyAddSampleIfNeeded("aof-rewrite-done-fsync",latency);
- }
+ serverAssert(server.aof_manifest != NULL);
- serverLog(LL_NOTICE,
- "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
-
- /* The only remaining thing to do is to rename the temporary file to
- * the configured file and switch the file descriptor used to do AOF
- * writes. We don't want close(2) or rename(2) calls to block the
- * server on old file deletion.
- *
- * There are two possible scenarios:
- *
- * 1) AOF is DISABLED and this was a one time rewrite. The temporary
- * file will be renamed to the configured file. When this file already
- * exists, it will be unlinked, which may block the server.
- *
- * 2) AOF is ENABLED and the rewritten AOF will immediately start
- * receiving writes. After the temporary file is renamed to the
- * configured file, the original AOF file descriptor will be closed.
- * Since this will be the last reference to that file, closing it
- * causes the underlying file to be unlinked, which may block the
- * server.
- *
- * To mitigate the blocking effect of the unlink operation (either
- * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
- * use a background thread to take care of this. First, we
- * make scenario 1 identical to scenario 2 by opening the target file
- * when it exists. The unlink operation after the rename(2) will then
- * be executed upon calling close(2) for its descriptor. Everything to
- * guarantee atomicity for this switch has already happened by then, so
- * we don't care what the outcome or duration of that close operation
- * is, as long as the file descriptor is released again. */
- if (server.aof_fd == -1) {
- /* AOF disabled */
-
- /* Don't care if this fails: oldfd will be -1 and we handle that.
- * One notable case of -1 return is if the old file does
- * not exist. */
- oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
- } else {
- /* AOF enabled */
- oldfd = -1; /* We'll set this to the current AOF file descriptor later. */
- }
+ /* Dup a temporary aof_manifest for subsequent modifications. */
+ temp_am = aofManifestDup(server.aof_manifest);
+
+ /* Get a new BASE file name and mark the previous (if we have)
+ * as the HISTORY type. */
+ new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am);
+ serverAssert(new_base_filename != NULL);
+ sds new_base_filepath = makePath(server.aof_dirname, new_base_filename);
- /* Rename the temporary file. This will not unlink the target file if
- * it exists, because we reference it with "oldfd". */
+ /* Rename the temporary aof file to 'new_base_filename'. */
latencyStartMonitor(latency);
- if (rename(tmpfile,server.aof_filename) == -1) {
+ if (rename(tmpfile, new_base_filepath) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
- server.aof_filename,
+ new_base_filename,
strerror(errno));
- close(newfd);
- if (oldfd != -1) close(oldfd);
+ aofManifestFree(temp_am);
+ sdsfree(new_base_filepath);
goto cleanup;
}
latencyEndMonitor(latency);
- latencyAddSampleIfNeeded("aof-rename",latency);
+ latencyAddSampleIfNeeded("aof-rename", latency);
- if (server.aof_fd == -1) {
- /* AOF disabled, we don't need to set the AOF file descriptor
- * to this new file, so we can close it. */
- close(newfd);
- } else {
- /* AOF enabled, replace the old fd with the new one. */
- oldfd = server.aof_fd;
- server.aof_fd = newfd;
+ /* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR
+ * to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */
+ markRewrittenIncrAofAsHistory(temp_am);
+
+ /* Persist our modifications. */
+ if (persistAofManifest(temp_am) == C_ERR) {
+ bg_unlink(new_base_filepath);
+ aofManifestFree(temp_am);
+ sdsfree(new_base_filepath);
+ goto cleanup;
+ }
+ sdsfree(new_base_filepath);
+
+ /* We can safely let `server.aof_manifest` point to 'temp_am' and free the previous one. */
+ aofManifestFreeAndUpdate(temp_am);
+
+ if (server.aof_fd != -1) {
+ /* AOF enabled. */
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
- aofUpdateCurrentSize();
+ server.aof_current_size = getAppendOnlyFileSize(new_base_filename) + server.aof_last_incr_size;
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
-
- /* Clear regular AOF buffer since its contents was just written to
- * the new AOF from the background rewrite buffer. */
- sdsfree(server.aof_buf);
- server.aof_buf = sdsempty();
}
+ /* We don't care about the return value of `aofDelHistoryFiles`, because the history
+ * deletion failure will not cause any problems. */
+ aofDelHistoryFiles();
+
server.aof_lastbgrewrite_status = C_OK;
serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
@@ -1963,9 +2452,6 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_state = AOF_ON;
- /* Asynchronously close the overwritten AOF. */
- if (oldfd != -1) bioCreateCloseJob(oldfd);
-
serverLog(LL_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
@@ -1984,8 +2470,6 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
}
cleanup:
- aofClosePipes();
- aofRewriteBufferReset();
aofRemoveTempFile(server.child_pid);
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
diff --git a/src/config.c b/src/config.c
index 0fc93d1de..79e43c301 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2150,6 +2150,10 @@ static int isValidDBfilename(char *val, const char **err) {
}
static int isValidAOFfilename(char *val, const char **err) {
+ if (!strcmp(val, "")) {
+ *err = "appendfilename can't be empty";
+ return 0;
+ }
if (!pathIsBaseName(val)) {
*err = "appendfilename can't be a path, just a filename";
return 0;
@@ -2157,6 +2161,18 @@ static int isValidAOFfilename(char *val, const char **err) {
return 1;
}
+static int isValidAOFdirname(char *val, const char **err) {
+ if (!strcmp(val, "")) {
+ *err = "appenddirname can't be empty";
+ return 0;
+ }
+ if (!pathIsBaseName(val)) {
+ *err = "appenddirname can't be a path, just a dirname";
+ return 0;
+ }
+ return 1;
+}
+
static int isValidAnnouncedHostname(char *val, const char **err) {
if (strlen(val) >= NET_HOST_STR_LEN) {
*err = "Hostnames must be less than "
@@ -2265,6 +2281,15 @@ static int updateAppendonly(const char **err) {
return 1;
}
+static int updateAofAutoGCEnabled(const char **err) {
+ UNUSED(err);
+ if (!server.aof_disable_auto_gc) {
+ aofDelHistoryFiles();
+ }
+
+ return 1;
+}
+
static int updateSighandlerEnabled(const char **err) {
UNUSED(err);
if (server.crashlog_enabled)
@@ -2680,7 +2705,8 @@ standardConfig configs[] = {
createBoolConfig("disable-thp", NULL, MODIFIABLE_CONFIG, server.disable_thp, 1, NULL, NULL),
createBoolConfig("cluster-allow-replica-migration", NULL, MODIFIABLE_CONFIG, server.cluster_allow_replica_migration, 1, NULL, NULL),
createBoolConfig("replica-announced", NULL, MODIFIABLE_CONFIG, server.replica_announced, 1, NULL, NULL),
-
+ createBoolConfig("aof-disable-auto-gc", NULL, MODIFIABLE_CONFIG, server.aof_disable_auto_gc, 0, NULL, updateAofAutoGCEnabled),
+
/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
createStringConfig("unixsocket", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.unixsocket, NULL, NULL, NULL),
@@ -2693,6 +2719,7 @@ standardConfig configs[] = {
createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL),
createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG | PROTECTED_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL),
createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_filename, "appendonly.aof", isValidAOFfilename, NULL),
+ createStringConfig("appenddirname", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_dirname, "appendonlydir", isValidAOFdirname, NULL),
createStringConfig("server_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.server_cpulist, NULL, NULL, NULL),
createStringConfig("bio_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bio_cpulist, NULL, NULL, NULL),
createStringConfig("aof_rewrite_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.aof_rewrite_cpulist, NULL, NULL, NULL),
diff --git a/src/debug.c b/src/debug.c
index 0b38b1856..c8a6fc186 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -570,7 +570,10 @@ NULL
if (server.aof_state != AOF_OFF) flushAppendOnlyFile(1);
emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
protectClient(c);
- int ret = loadAppendOnlyFile(server.aof_filename);
+ if (server.aof_manifest) aofManifestFree(server.aof_manifest);
+ aofLoadManifestFromDisk();
+ aofDelHistoryFiles();
+ int ret = loadAppendOnlyFiles(server.aof_manifest);
if (ret != AOF_OK && ret != AOF_EMPTY)
exit(1);
unprotectClient(c);
diff --git a/src/evict.c b/src/evict.c
index 6e2f8af72..9d0557df1 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -365,7 +365,7 @@ size_t freeMemoryGetNotCountedMemory(void) {
}
if (server.aof_state != AOF_OFF) {
- overhead += sdsAllocSize(server.aof_buf)+aofRewriteBufferMemoryUsage();
+ overhead += sdsAllocSize(server.aof_buf);
}
return overhead;
}
diff --git a/src/latency.c b/src/latency.c
index d24c7922a..5b3476a81 100644
--- a/src/latency.c
+++ b/src/latency.c
@@ -445,7 +445,7 @@ sds createLatencyReport(void) {
}
if (advise_ssd) {
- report = sdscat(report,"- SSD disks are able to reduce fsync latency, and total time needed for snapshotting and AOF log rewriting (resulting in smaller memory usage and smaller final AOF rewrite buffer flushes). With extremely high write load SSD disks can be a good option. However Redis should perform reasonably with high load using normal disks. Use this advice as a last resort.\n");
+ report = sdscat(report,"- SSD disks are able to reduce fsync latency, and total time needed for snapshotting and AOF log rewriting (resulting in smaller memory usage). With extremely high write load SSD disks can be a good option. However Redis should perform reasonably with high load using normal disks. Use this advice as a last resort.\n");
}
if (advise_data_writeback) {
diff --git a/src/object.c b/src/object.c
index 3eef47c5d..089547167 100644
--- a/src/object.c
+++ b/src/object.c
@@ -1202,7 +1202,6 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mem = 0;
if (server.aof_state != AOF_OFF) {
mem += sdsZmallocSize(server.aof_buf);
- mem += aofRewriteBufferMemoryUsage();
}
mh->aof_buffer = mem;
mem_total+=mem;
diff --git a/src/rdb.c b/src/rdb.c
index 2b7a8fa81..b6d67181a 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1256,7 +1256,6 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
ssize_t written = 0;
ssize_t res;
static long long info_updated_time = 0;
- size_t processed = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
redisDb *db = server.db + dbid;
@@ -1299,16 +1298,6 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);
- /* When this RDB is produced as part of an AOF rewrite, move
- * accumulated diff from parent to child while rewriting in
- * order to have a smaller final write. */
- if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
- rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
- {
- processed = rdb->processed_bytes;
- aofReadDiffFromParent();
- }
-
/* Update child info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
@@ -2677,21 +2666,30 @@ void startLoading(size_t size, int rdbflags, int async) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */
-void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
- struct stat sb;
- if (fstat(fileno(fp), &sb) == -1)
- sb.st_size = 0;
+void startLoadingFile(size_t size, char* filename, int rdbflags) {
rdbFileBeingLoaded = filename;
- startLoading(sb.st_size, rdbflags, 0);
+ startLoading(size, rdbflags, 0);
}
-/* Refresh the loading progress info */
-void loadingProgress(off_t pos) {
+/* Refresh the absolute loading progress info */
+void loadingAbsProgress(off_t pos) {
server.loading_loaded_bytes = pos;
if (server.stat_peak_memory < zmalloc_used_memory())
server.stat_peak_memory = zmalloc_used_memory();
}
+/* Refresh the incremental loading progress info */
+void loadingIncrProgress(off_t size) {
+ server.loading_loaded_bytes += size;
+ if (server.stat_peak_memory < zmalloc_used_memory())
+ server.stat_peak_memory = zmalloc_used_memory();
+}
+
+/* Update the file name currently being loaded */
+void updateLoadingFileName(char* filename) {
+ rdbFileBeingLoaded = filename;
+}
+
/* Loading finished */
void stopLoading(int success) {
server.loading = 0;
@@ -2738,7 +2736,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
{
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster();
- loadingProgress(r->processed_bytes);
+ loadingAbsProgress(r->processed_bytes);
processEventsWhileBlocked();
processModuleLoadingProgressEvent(0);
}
@@ -3176,9 +3174,14 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
FILE *fp;
rio rdb;
int retval;
+ struct stat sb;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
- startLoadingFile(fp, filename,rdbflags);
+
+ if (fstat(fileno(fp), &sb) == -1)
+ sb.st_size = 0;
+
+ startLoadingFile(sb.st_size, filename, rdbflags);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rdbflags,rsi);
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index ea608b7b8..eb41b0f66 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -34,6 +34,7 @@
#include <stdarg.h>
#include <sys/time.h>
#include <unistd.h>
+#include <sys/stat.h>
void createSharedObjects(void);
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len);
@@ -192,11 +193,15 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
char buf[1024];
long long expiretime, now = mstime();
static rio rdb; /* Pointed by global struct riostate. */
+ struct stat sb;
int closefile = (fp == NULL);
if (fp == NULL && (fp = fopen(rdbfilename,"r")) == NULL) return 1;
- startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE);
+ if (fstat(fileno(fp), &sb) == -1)
+ sb.st_size = 0;
+
+ startLoadingFile(sb.st_size, rdbfilename, RDBFLAGS_NONE);
rioInitWithFile(&rdb,fp);
rdbstate.rio = &rdb;
rdb.update_cksum = rdbLoadProgressCallback;
diff --git a/src/server.c b/src/server.c
index 6b65a460c..4bb6787f9 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1191,7 +1191,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (!hasActiveChildProcess() &&
- server.aof_rewrite_scheduled)
+ server.aof_rewrite_scheduled &&
+ !aofRewriteLimited())
{
rewriteAppendOnlyFileBackground();
}
@@ -1230,7 +1231,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
- server.aof_current_size > server.aof_rewrite_min_size)
+ server.aof_current_size > server.aof_rewrite_min_size &&
+ !aofRewriteLimited())
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
@@ -1248,8 +1250,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
- if (server.aof_state == AOF_ON && server.aof_flush_postponed_start)
+ if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
+ server.aof_flush_postponed_start)
+ {
flushAppendOnlyFile(0);
+ }
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
@@ -1506,7 +1511,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
trackingBroadcastInvalidationMessages();
/* Write the AOF buffer on disk */
- if (server.aof_state == AOF_ON)
+ if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
flushAppendOnlyFile(0);
/* Try to process blocked clients every once in while. Example: A module
@@ -1761,6 +1766,7 @@ void initServerConfig(void) {
server.aof_fd = -1;
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.aof_flush_postponed_start = 0;
+ server.aof_last_incr_size = 0;
server.active_defrag_running = 0;
server.notify_keyspace_events = 0;
server.blocked_clients = 0;
@@ -2390,7 +2396,6 @@ void initServer(void) {
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
server.child_info_nread = 0;
- aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
@@ -3859,6 +3864,9 @@ int finishShutdown(void) {
}
}
+ /* Free the AOF manifest. */
+ if (server.aof_manifest) aofManifestFree(server.aof_manifest);
+
/* Fire the shutdown modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL);
@@ -4978,14 +4986,12 @@ sds genRedisInfoString(const char *section) {
"aof_base_size:%lld\r\n"
"aof_pending_rewrite:%d\r\n"
"aof_buffer_length:%zu\r\n"
- "aof_rewrite_buffer_length:%lu\r\n"
"aof_pending_bio_fsync:%llu\r\n"
"aof_delayed_fsync:%lu\r\n",
(long long) server.aof_current_size,
(long long) server.aof_rewrite_base_size,
server.aof_rewrite_scheduled,
sdslen(server.aof_buf),
- aofRewriteBufferSize(),
bioPendingJobsOfType(BIO_AOF_FSYNC),
server.aof_delayed_fsync);
}
@@ -6017,12 +6023,9 @@ int checkForSentinelMode(int argc, char **argv, char *exec_name) {
void loadDataFromDisk(void) {
long long start = ustime();
if (server.aof_state == AOF_ON) {
- /* It's not a failure if the file is empty or doesn't exist (later we will create it) */
- int ret = loadAppendOnlyFile(server.aof_filename);
+ int ret = loadAppendOnlyFiles(server.aof_manifest);
if (ret == AOF_FAILED || ret == AOF_OPEN_ERR)
exit(1);
- if (ret == AOF_OK)
- serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
errno = 0; /* Prevent a stale value from affecting error checking */
@@ -6492,17 +6495,10 @@ int main(int argc, char **argv) {
moduleLoadFromQueue();
ACLLoadUsersAtStartup();
InitServerLast();
+ aofLoadManifestFromDisk();
loadDataFromDisk();
- /* Open the AOF file if needed. */
- if (server.aof_state == AOF_ON) {
- server.aof_fd = open(server.aof_filename,
- O_WRONLY|O_APPEND|O_CREAT,0644);
- if (server.aof_fd == -1) {
- serverLog(LL_WARNING, "Can't open the append-only file: %s",
- strerror(errno));
- exit(1);
- }
- }
+ aofOpenIfNeededOnServerStart();
+ aofDelHistoryFiles();
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == C_ERR) {
serverLog(LL_WARNING,
diff --git a/src/server.h b/src/server.h
index fb22b3cf7..7aa73ab73 100644
--- a/src/server.h
+++ b/src/server.h
@@ -105,7 +105,6 @@ typedef long long ustime_t; /* microsecond time type. */
#define OBJ_SHARED_BULKHDR_LEN 32
#define LOG_MAX_LEN 1024 /* Default maximum length of syslog messages.*/
#define AOF_REWRITE_ITEMS_PER_CMD 64
-#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10)
#define AOF_ANNOTATION_LINE_MAX_LEN 1024
#define CONFIG_AUTHPASS_MAX_LEN 512
#define CONFIG_RUN_ID_SIZE 40
@@ -251,6 +250,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define AOF_EMPTY 2
#define AOF_OPEN_ERR 3
#define AOF_FAILED 4
+#define AOF_TRUNCATED 5
/* Command doc flags */
#define CMD_DOC_NONE 0
@@ -1340,6 +1340,33 @@ typedef struct redisTLSContextConfig {
} redisTLSContextConfig;
/*-----------------------------------------------------------------------------
+ * AOF manifest definition
+ *----------------------------------------------------------------------------*/
+typedef enum {
+ AOF_FILE_TYPE_BASE = 'b', /* BASE file */
+ AOF_FILE_TYPE_HIST = 'h', /* HISTORY file */
+ AOF_FILE_TYPE_INCR = 'i', /* INCR file */
+} aof_file_type;
+
+typedef struct {
+ sds file_name; /* file name */
+ long long file_seq; /* file sequence */
+ aof_file_type file_type; /* file type */
+} aofInfo;
+
+typedef struct {
+ aofInfo *base_aof_info; /* BASE file information. NULL if there is no BASE file. */
+ list *incr_aof_list; /* INCR AOFs list. We may have multiple INCR AOF when rewrite fails. */
+ list *history_aof_list; /* HISTORY AOF list. When the AOFRW success, The aofInfo contained in
+ `base_aof_info` and `incr_aof_list` will be moved to this list. We
+ will delete these AOF files when AOFRW finish. */
+ long long curr_base_file_seq; /* The sequence number used by the current BASE file. */
+ long long curr_incr_file_seq; /* The sequence number used by the current INCR file. */
+ int dirty; /* 1 Indicates that the aofManifest in the memory is inconsistent with
+ disk, we need to persist it immediately. */
+} aofManifest;
+
+/*-----------------------------------------------------------------------------
* Global server state
*----------------------------------------------------------------------------*/
@@ -1556,12 +1583,14 @@ struct redisServer {
int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */
- char *aof_filename; /* Name of the AOF file */
+ char *aof_filename; /* Basename of the AOF file and manifest file */
+ char *aof_dirname; /* Name of the AOF directory */
int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
- off_t aof_current_size; /* AOF current size. */
+ off_t aof_current_size; /* AOF current size (Including BASE + INCRs). */
+ off_t aof_last_incr_size; /* The size of the latest incr AOF. */
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
@@ -1585,16 +1614,10 @@ struct redisServer {
int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
redisAtomic int aof_bio_fsync_status; /* Status of AOF fsync in bio job. */
redisAtomic int aof_bio_fsync_errno; /* Errno of AOF fsync in bio job. */
- /* AOF pipes used to communicate between parent and child during rewrite. */
- int aof_pipe_write_data_to_child;
- int aof_pipe_read_data_from_parent;
- int aof_pipe_write_ack_to_parent;
- int aof_pipe_read_ack_from_child;
- int aof_pipe_write_ack_to_child;
- int aof_pipe_read_ack_from_parent;
- int aof_stop_sending_diff; /* If true stop sending accumulated diffs
- to child process. */
- sds aof_child_diff; /* AOF diff accumulator child side. */
+ aofManifest *aof_manifest; /* Used to track AOFs. */
+ int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs?
+ default no. (for testings). */
+
/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
@@ -2571,10 +2594,12 @@ void abortFailover(const char *err);
const char *getFailoverStateString();
/* Generic persistence functions */
-void startLoadingFile(FILE* fp, char* filename, int rdbflags);
+void startLoadingFile(size_t size, char* filename, int rdbflags);
void startLoading(size_t size, int rdbflags, int async);
-void loadingProgress(off_t pos);
+void loadingAbsProgress(off_t pos);
+void loadingIncrProgress(off_t size);
void stopLoading(int success);
+void updateLoadingFileName(char* filename);
void startSaving(int rdbflags);
void stopSaving(int success);
int allPersistenceDisabled(void);
@@ -2594,16 +2619,18 @@ void flushAppendOnlyFile(int force);
void feedAppendOnlyFile(int dictid, robj **argv, int argc);
void aofRemoveTempFile(pid_t childpid);
int rewriteAppendOnlyFileBackground(void);
-int loadAppendOnlyFile(char *filename);
+int loadAppendOnlyFiles(aofManifest *am);
void stopAppendOnly(void);
int startAppendOnly(void);
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
-void aofRewriteBufferReset(void);
-unsigned long aofRewriteBufferSize(void);
-unsigned long aofRewriteBufferMemoryUsage(void);
ssize_t aofReadDiffFromParent(void);
void killAppendOnlyChild(void);
void restartAOFAfterSYNC();
+void aofLoadManifestFromDisk(void);
+void aofOpenIfNeededOnServerStart(void);
+void aofManifestFree(aofManifest *am);
+int aofDelHistoryFiles(void);
+int aofRewriteLimited(void);
/* Child info */
void openChildInfoPipe(void);
diff --git a/src/util.c b/src/util.c
index 9cdc30632..d11a07655 100644
--- a/src/util.c
+++ b/src/util.c
@@ -40,9 +40,13 @@
#include <stdint.h>
#include <errno.h>
#include <time.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <fcntl.h>
#include "util.h"
#include "sha256.h"
+#include "config.h"
/* Glob-style pattern matching. */
int stringmatchlen(const char *pattern, int patternLen,
@@ -810,6 +814,82 @@ int pathIsBaseName(char *path) {
return strchr(path,'/') == NULL && strchr(path,'\\') == NULL;
}
+int fileExist(char *filename) {
+ struct stat statbuf;
+ return stat(filename, &statbuf) == 0 && S_ISREG(statbuf.st_mode);
+}
+
+int dirExists(char *dname) {
+ struct stat statbuf;
+ return stat(dname, &statbuf) == 0 && S_ISDIR(statbuf.st_mode);
+}
+
+int dirCreateIfMissing(char *dname) {
+ if (mkdir(dname, 0755) != 0) {
+ if (errno != EEXIST) {
+ return -1;
+ } else if (!dirExists(dname)) {
+ errno = ENOTDIR;
+ return -1;
+ }
+ }
+ return 0;
+}
+
+int dirRemove(char *dname) {
+ DIR *dir;
+ struct stat stat_entry;
+ struct dirent *entry;
+ char full_path[PATH_MAX + 1];
+
+ if ((dir = opendir(dname)) == NULL) {
+ return -1;
+ }
+
+ while ((entry = readdir(dir)) != NULL) {
+ if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, "..")) continue;
+
+ snprintf(full_path, sizeof(full_path), "%s/%s", dname, entry->d_name);
+
+ int fd = open(full_path, O_RDONLY|O_NONBLOCK);
+ if (fd == -1) {
+ closedir(dir);
+ return -1;
+ }
+
+ if (fstat(fd, &stat_entry) == -1) {
+ close(fd);
+ closedir(dir);
+ return -1;
+ }
+ close(fd);
+
+ if (S_ISDIR(stat_entry.st_mode) != 0) {
+ if (dirRemove(full_path) == -1) {
+ return -1;
+ }
+ continue;
+ }
+
+ if (unlink(full_path) != 0) {
+ closedir(dir);
+ return -1;
+ }
+ }
+
+ if (rmdir(dname) != 0) {
+ closedir(dir);
+ return -1;
+ }
+
+ closedir(dir);
+ return 0;
+}
+
+sds makePath(char *path, char *filename) {
+ return sdscatfmt(sdsempty(), "%s/%s", path, filename);
+}
+
#ifdef REDIS_TEST
#include <assert.h>
diff --git a/src/util.h b/src/util.h
index 62fc74986..e2b0d13ca 100644
--- a/src/util.h
+++ b/src/util.h
@@ -65,6 +65,11 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode);
sds getAbsolutePath(char *filename);
long getTimeZone(void);
int pathIsBaseName(char *path);
+int dirCreateIfMissing(char *dname);
+int dirExists(char *dname);
+int dirRemove(char *dname);
+int fileExist(char *filename);
+sds makePath(char *path, char *filename);
#ifdef REDIS_TEST
int utilTest(int argc, char **argv, int flags);