summaryrefslogtreecommitdiff
path: root/src/aof.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aof.c')
-rw-r--r--src/aof.c123
1 files changed, 96 insertions, 27 deletions
diff --git a/src/aof.c b/src/aof.c
index 9a9368e61..c196debed 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -87,7 +87,7 @@ void aofManifestFreeAndUpdate(aofManifest *am);
#define RDB_FORMAT_SUFFIX ".rdb"
#define AOF_FORMAT_SUFFIX ".aof"
#define MANIFEST_NAME_SUFFIX ".manifest"
-#define MANIFEST_TEMP_NAME_PREFIX "temp_"
+#define TEMP_FILE_NAME_PREFIX "temp-"
/* AOF manifest key. */
#define AOF_MANIFEST_KEY_FILE_NAME "file"
@@ -169,7 +169,7 @@ sds getAofManifestFileName() {
}
sds getTempAofManifestFileName() {
- return sdscatprintf(sdsempty(), "%s%s%s", MANIFEST_TEMP_NAME_PREFIX,
+ return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX,
server.aof_filename, MANIFEST_NAME_SUFFIX);
}
@@ -462,6 +462,12 @@ sds getNewIncrAofName(aofManifest *am) {
return ai->file_name;
}
+/* Get temp INCR type AOF name. */
+sds getTempIncrAofName() {
+ return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename,
+ INCR_FILE_SUFFIX);
+}
+
/* Get the last INCR AOF name or create a new one. */
sds getLastIncrAofName(aofManifest *am) {
serverAssert(am != NULL);
@@ -674,6 +680,17 @@ int aofDelHistoryFiles(void) {
return persistAofManifest(server.aof_manifest);
}
+/* Used to clean up temp INCR AOF when AOFRW fails. */
+void aofDelTempIncrAofFile() {
+ sds aof_filename = getTempIncrAofName();
+ sds aof_filepath = makePath(server.aof_dirname, aof_filename);
+ serverLog(LL_NOTICE, "Removing the temp incr aof file %s in the background", aof_filename);
+ bg_unlink(aof_filepath);
+ sdsfree(aof_filepath);
+ sdsfree(aof_filename);
+ return;
+}
+
/* Called after `loadDataFromDisk` when redis start. If `server.aof_state` is
* 'AOF_ON', It will do three things:
* 1. Force create a BASE file when redis starts with an empty dataset
@@ -739,44 +756,52 @@ int aofFileExist(char *filename) {
}
/* Called in `rewriteAppendOnlyFileBackground`. If `server.aof_state`
- * is 'AOF_ON' or 'AOF_WAIT_REWRITE', It will do two things:
+ * is 'AOF_ON', It will do two things:
* 1. Open a new INCR type AOF for writing
* 2. Synchronously update the manifest file to the disk
*
* The above two steps of modification are atomic, that is, if
* any step fails, the entire operation will rollback and returns
* C_ERR, and if all succeeds, it returns C_OK.
+ *
+ * If `server.aof_state` is 'AOF_WAIT_REWRITE', It will open a temporary INCR AOF
+ * file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be
+ * renamed in the `backgroundRewriteDoneHandler` and written to the manifest file.
* */
int openNewIncrAofForAppend(void) {
serverAssert(server.aof_manifest != NULL);
- int newfd;
+ int newfd = -1;
+ aofManifest *temp_am = NULL;
+ sds new_aof_name = NULL;
/* Only open new INCR AOF when AOF enabled. */
if (server.aof_state == AOF_OFF) return C_OK;
- /* Dup a temp aof_manifest to modify. */
- aofManifest *temp_am = aofManifestDup(server.aof_manifest);
-
/* Open new AOF. */
- sds new_aof_name = getNewIncrAofName(temp_am);
+ if (server.aof_state == AOF_WAIT_REWRITE) {
+ /* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */
+ new_aof_name = getTempIncrAofName();
+ } else {
+ /* Dup a temp aof_manifest to modify. */
+ temp_am = aofManifestDup(server.aof_manifest);
+ new_aof_name = sdsdup(getNewIncrAofName(temp_am));
+ }
sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name);
newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
sdsfree(new_aof_filepath);
if (newfd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file %s: %s",
new_aof_name, strerror(errno));
-
- aofManifestFree(temp_am);
- return C_ERR;
+ goto cleanup;
}
- /* Persist AOF Manifest. */
- int ret = persistAofManifest(temp_am);
- if (ret == C_ERR) {
- close(newfd);
- aofManifestFree(temp_am);
- return C_ERR;
+ if (temp_am) {
+ /* Persist AOF Manifest. */
+ if (persistAofManifest(temp_am) == C_ERR) {
+ goto cleanup;
+ }
}
+ sdsfree(new_aof_name);
/* If reaches here, we can safely modify the `server.aof_manifest`
* and `server.aof_fd`. */
@@ -788,8 +813,14 @@ int openNewIncrAofForAppend(void) {
/* Reset the aof_last_incr_size. */
server.aof_last_incr_size = 0;
/* Update `server.aof_manifest`. */
- aofManifestFreeAndUpdate(temp_am);
+ if (temp_am) aofManifestFreeAndUpdate(temp_am);
return C_OK;
+
+cleanup:
+ if (new_aof_name) sdsfree(new_aof_name);
+ if (newfd != -1) close(newfd);
+ if (temp_am) aofManifestFree(temp_am);
+ return C_ERR;
}
/* Whether to limit the execution of Background AOF rewrite.
@@ -818,17 +849,13 @@ int aofRewriteLimited(void) {
static int next_delay_minutes = 0;
static time_t next_rewrite_time = 0;
- /* If the number of incr AOFs exceeds the threshold but server.aof_lastbgrewrite_status is OK, it
- * means that redis may have just loaded a dataset containing many incr AOFs. At this time, we
- * will not limit the AOFRW. */
- unsigned long incr_aof_num = listLength(server.aof_manifest->incr_aof_list);
- if (incr_aof_num < AOF_REWRITE_LIMITE_THRESHOLD || server.aof_lastbgrewrite_status == C_OK) {
+ if (server.stat_aofrw_consecutive_failures < AOF_REWRITE_LIMITE_THRESHOLD) {
/* We may be recovering from limited state, so reset all states. */
next_delay_minutes = 0;
next_rewrite_time = 0;
return 0;
}
-
+
/* if it is in the limiting state, then check if the next_rewrite_time is reached */
if (next_rewrite_time != 0) {
if (server.unixtime < next_rewrite_time) {
@@ -1266,7 +1293,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON ||
- server.child_type == CHILD_TYPE_AOF)
+ (server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF))
{
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
}
@@ -2404,6 +2431,9 @@ void bgrewriteaofCommand(client *c) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (hasActiveChildProcess() || server.in_exec) {
server.aof_rewrite_scheduled = 1;
+ /* When manually triggering AOFRW we reset the count
+ * so that it can be executed immediately. */
+ server.stat_aofrw_consecutive_failures = 0;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
@@ -2514,7 +2544,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
- new_base_filename,
+ new_base_filepath,
strerror(errno));
aofManifestFree(temp_am);
sdsfree(new_base_filepath);
@@ -2523,6 +2553,35 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename", latency);
+ /* Rename the temporary incr aof file to 'new_incr_filename'. */
+ if (server.aof_state == AOF_WAIT_REWRITE) {
+ /* Get temporary incr aof name. */
+ sds temp_incr_aof_name = getTempIncrAofName();
+ sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name);
+ sdsfree(temp_incr_aof_name);
+ /* Get next new incr aof name. */
+ sds new_incr_filename = getNewIncrAofName(temp_am);
+ sds new_incr_filepath = makePath(server.aof_dirname, new_incr_filename);
+ latencyStartMonitor(latency);
+ if (rename(temp_incr_filepath, new_incr_filepath) == -1) {
+ serverLog(LL_WARNING,
+ "Error trying to rename the temporary incr AOF file %s into %s: %s",
+ temp_incr_filepath,
+ new_incr_filepath,
+ strerror(errno));
+ bg_unlink(new_base_filepath);
+ sdsfree(new_base_filepath);
+ aofManifestFree(temp_am);
+ sdsfree(temp_incr_filepath);
+ sdsfree(new_incr_filepath);
+ goto cleanup;
+ }
+ latencyEndMonitor(latency);
+ latencyAddSampleIfNeeded("aof-rename", latency);
+ sdsfree(temp_incr_filepath);
+ sdsfree(new_incr_filepath);
+ }
+
/* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR
* to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */
markRewrittenIncrAofAsHistory(temp_am);
@@ -2553,6 +2612,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
aofDelHistoryFiles();
server.aof_lastbgrewrite_status = C_OK;
+ server.stat_aofrw_consecutive_failures = 0;
serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed */
@@ -2563,14 +2623,17 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
server.aof_lastbgrewrite_status = C_ERR;
+ server.stat_aofrw_consecutive_failures++;
serverLog(LL_WARNING,
"Background AOF rewrite terminated with error");
} else {
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* triggering an error condition. */
- if (bysignal != SIGUSR1)
+ if (bysignal != SIGUSR1) {
server.aof_lastbgrewrite_status = C_ERR;
+ server.stat_aofrw_consecutive_failures++;
+ }
serverLog(LL_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
@@ -2578,6 +2641,12 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
cleanup:
aofRemoveTempFile(server.child_pid);
+ /* Clear AOF buffer and delete temp incr aof for next rewrite. */
+ if (server.aof_state == AOF_WAIT_REWRITE) {
+ sdsfree(server.aof_buf);
+ server.aof_buf = sdsempty();
+ aofDelTempIncrAofFile();
+ }
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */