diff options
Diffstat (limited to 'src/aof.c')
-rw-r--r-- | src/aof.c | 123 |
1 files changed, 96 insertions, 27 deletions
@@ -87,7 +87,7 @@ void aofManifestFreeAndUpdate(aofManifest *am); #define RDB_FORMAT_SUFFIX ".rdb" #define AOF_FORMAT_SUFFIX ".aof" #define MANIFEST_NAME_SUFFIX ".manifest" -#define MANIFEST_TEMP_NAME_PREFIX "temp_" +#define TEMP_FILE_NAME_PREFIX "temp-" /* AOF manifest key. */ #define AOF_MANIFEST_KEY_FILE_NAME "file" @@ -169,7 +169,7 @@ sds getAofManifestFileName() { } sds getTempAofManifestFileName() { - return sdscatprintf(sdsempty(), "%s%s%s", MANIFEST_TEMP_NAME_PREFIX, + return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename, MANIFEST_NAME_SUFFIX); } @@ -462,6 +462,12 @@ sds getNewIncrAofName(aofManifest *am) { return ai->file_name; } +/* Get temp INCR type AOF name. */ +sds getTempIncrAofName() { + return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename, + INCR_FILE_SUFFIX); +} + /* Get the last INCR AOF name or create a new one. */ sds getLastIncrAofName(aofManifest *am) { serverAssert(am != NULL); @@ -674,6 +680,17 @@ int aofDelHistoryFiles(void) { return persistAofManifest(server.aof_manifest); } +/* Used to clean up temp INCR AOF when AOFRW fails. */ +void aofDelTempIncrAofFile() { + sds aof_filename = getTempIncrAofName(); + sds aof_filepath = makePath(server.aof_dirname, aof_filename); + serverLog(LL_NOTICE, "Removing the temp incr aof file %s in the background", aof_filename); + bg_unlink(aof_filepath); + sdsfree(aof_filepath); + sdsfree(aof_filename); + return; +} + /* Called after `loadDataFromDisk` when redis start. If `server.aof_state` is * 'AOF_ON', It will do three things: * 1. Force create a BASE file when redis starts with an empty dataset @@ -739,44 +756,52 @@ int aofFileExist(char *filename) { } /* Called in `rewriteAppendOnlyFileBackground`. If `server.aof_state` - * is 'AOF_ON' or 'AOF_WAIT_REWRITE', It will do two things: + * is 'AOF_ON', It will do two things: * 1. Open a new INCR type AOF for writing * 2. Synchronously update the manifest file to the disk * * The above two steps of modification are atomic, that is, if * any step fails, the entire operation will rollback and returns * C_ERR, and if all succeeds, it returns C_OK. + * + * If `server.aof_state` is 'AOF_WAIT_REWRITE', It will open a temporary INCR AOF + * file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be + * renamed in the `backgroundRewriteDoneHandler` and written to the manifest file. * */ int openNewIncrAofForAppend(void) { serverAssert(server.aof_manifest != NULL); - int newfd; + int newfd = -1; + aofManifest *temp_am = NULL; + sds new_aof_name = NULL; /* Only open new INCR AOF when AOF enabled. */ if (server.aof_state == AOF_OFF) return C_OK; - /* Dup a temp aof_manifest to modify. */ - aofManifest *temp_am = aofManifestDup(server.aof_manifest); - /* Open new AOF. */ - sds new_aof_name = getNewIncrAofName(temp_am); + if (server.aof_state == AOF_WAIT_REWRITE) { + /* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */ + new_aof_name = getTempIncrAofName(); + } else { + /* Dup a temp aof_manifest to modify. */ + temp_am = aofManifestDup(server.aof_manifest); + new_aof_name = sdsdup(getNewIncrAofName(temp_am)); + } sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name); newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644); sdsfree(new_aof_filepath); if (newfd == -1) { serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_aof_name, strerror(errno)); - - aofManifestFree(temp_am); - return C_ERR; + goto cleanup; } - /* Persist AOF Manifest. */ - int ret = persistAofManifest(temp_am); - if (ret == C_ERR) { - close(newfd); - aofManifestFree(temp_am); - return C_ERR; + if (temp_am) { + /* Persist AOF Manifest. */ + if (persistAofManifest(temp_am) == C_ERR) { + goto cleanup; + } } + sdsfree(new_aof_name); /* If reaches here, we can safely modify the `server.aof_manifest` * and `server.aof_fd`. */ @@ -788,8 +813,14 @@ int openNewIncrAofForAppend(void) { /* Reset the aof_last_incr_size. */ server.aof_last_incr_size = 0; /* Update `server.aof_manifest`. */ - aofManifestFreeAndUpdate(temp_am); + if (temp_am) aofManifestFreeAndUpdate(temp_am); return C_OK; + +cleanup: + if (new_aof_name) sdsfree(new_aof_name); + if (newfd != -1) close(newfd); + if (temp_am) aofManifestFree(temp_am); + return C_ERR; } /* Whether to limit the execution of Background AOF rewrite. @@ -818,17 +849,13 @@ int aofRewriteLimited(void) { static int next_delay_minutes = 0; static time_t next_rewrite_time = 0; - /* If the number of incr AOFs exceeds the threshold but server.aof_lastbgrewrite_status is OK, it - * means that redis may have just loaded a dataset containing many incr AOFs. At this time, we - * will not limit the AOFRW. */ - unsigned long incr_aof_num = listLength(server.aof_manifest->incr_aof_list); - if (incr_aof_num < AOF_REWRITE_LIMITE_THRESHOLD || server.aof_lastbgrewrite_status == C_OK) { + if (server.stat_aofrw_consecutive_failures < AOF_REWRITE_LIMITE_THRESHOLD) { /* We may be recovering from limited state, so reset all states. */ next_delay_minutes = 0; next_rewrite_time = 0; return 0; } - + /* if it is in the limiting state, then check if the next_rewrite_time is reached */ if (next_rewrite_time != 0) { if (server.unixtime < next_rewrite_time) { @@ -1266,7 +1293,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) { * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == AOF_ON || - server.child_type == CHILD_TYPE_AOF) + (server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF)) { server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf)); } @@ -2404,6 +2431,9 @@ void bgrewriteaofCommand(client *c) { addReplyError(c,"Background append only file rewriting already in progress"); } else if (hasActiveChildProcess() || server.in_exec) { server.aof_rewrite_scheduled = 1; + /* When manually triggering AOFRW we reset the count + * so that it can be executed immediately. */ + server.stat_aofrw_consecutive_failures = 0; addReplyStatus(c,"Background append only file rewriting scheduled"); } else if (rewriteAppendOnlyFileBackground() == C_OK) { addReplyStatus(c,"Background append only file rewriting started"); @@ -2514,7 +2544,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { serverLog(LL_WARNING, "Error trying to rename the temporary AOF file %s into %s: %s", tmpfile, - new_base_filename, + new_base_filepath, strerror(errno)); aofManifestFree(temp_am); sdsfree(new_base_filepath); @@ -2523,6 +2553,35 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-rename", latency); + /* Rename the temporary incr aof file to 'new_incr_filename'. */ + if (server.aof_state == AOF_WAIT_REWRITE) { + /* Get temporary incr aof name. */ + sds temp_incr_aof_name = getTempIncrAofName(); + sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name); + sdsfree(temp_incr_aof_name); + /* Get next new incr aof name. */ + sds new_incr_filename = getNewIncrAofName(temp_am); + sds new_incr_filepath = makePath(server.aof_dirname, new_incr_filename); + latencyStartMonitor(latency); + if (rename(temp_incr_filepath, new_incr_filepath) == -1) { + serverLog(LL_WARNING, + "Error trying to rename the temporary incr AOF file %s into %s: %s", + temp_incr_filepath, + new_incr_filepath, + strerror(errno)); + bg_unlink(new_base_filepath); + sdsfree(new_base_filepath); + aofManifestFree(temp_am); + sdsfree(temp_incr_filepath); + sdsfree(new_incr_filepath); + goto cleanup; + } + latencyEndMonitor(latency); + latencyAddSampleIfNeeded("aof-rename", latency); + sdsfree(temp_incr_filepath); + sdsfree(new_incr_filepath); + } + /* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR * to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */ markRewrittenIncrAofAsHistory(temp_am); @@ -2553,6 +2612,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { aofDelHistoryFiles(); server.aof_lastbgrewrite_status = C_OK; + server.stat_aofrw_consecutive_failures = 0; serverLog(LL_NOTICE, "Background AOF rewrite finished successfully"); /* Change state from WAIT_REWRITE to ON if needed */ @@ -2563,14 +2623,17 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { server.aof_lastbgrewrite_status = C_ERR; + server.stat_aofrw_consecutive_failures++; serverLog(LL_WARNING, "Background AOF rewrite terminated with error"); } else { /* SIGUSR1 is whitelisted, so we have a way to kill a child without * triggering an error condition. */ - if (bysignal != SIGUSR1) + if (bysignal != SIGUSR1) { server.aof_lastbgrewrite_status = C_ERR; + server.stat_aofrw_consecutive_failures++; + } serverLog(LL_WARNING, "Background AOF rewrite terminated by signal %d", bysignal); @@ -2578,6 +2641,12 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { cleanup: aofRemoveTempFile(server.child_pid); + /* Clear AOF buffer and delete temp incr aof for next rewrite. */ + if (server.aof_state == AOF_WAIT_REWRITE) { + sdsfree(server.aof_buf); + server.aof_buf = sdsempty(); + aofDelTempIncrAofFile(); + } server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start; server.aof_rewrite_time_start = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ |