summaryrefslogtreecommitdiff
path: root/src/aof.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aof.c')
-rw-r--r--src/aof.c96
1 files changed, 61 insertions, 35 deletions
diff --git a/src/aof.c b/src/aof.c
index d3191277f..6753e8bcc 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -235,6 +235,8 @@ void stopAppendOnly(void) {
serverAssert(server.aof_state != AOF_OFF);
flushAppendOnlyFile(1);
redis_fsync(server.aof_fd);
+ server.aof_fsync_offset = server.aof_current_size;
+ server.aof_last_fsync = server.unixtime;
close(server.aof_fd);
server.aof_fd = -1;
@@ -242,6 +244,8 @@ void stopAppendOnly(void) {
server.aof_state = AOF_OFF;
server.aof_rewrite_scheduled = 0;
killAppendOnlyChild();
+ sdsfree(server.aof_buf);
+ server.aof_buf = sdsempty();
}
/* Called when the user switches from "appendonly no" to "appendonly yes"
@@ -285,6 +289,12 @@ int startAppendOnly(void) {
server.aof_state = AOF_WAIT_REWRITE;
server.aof_last_fsync = server.unixtime;
server.aof_fd = newfd;
+
+ /* If AOF was in error state, we just ignore it and log the event. */
+ if (server.aof_last_write_status == C_ERR) {
+ serverLog(LL_WARNING,"AOF reopen, just ignore the last error.");
+ server.aof_last_write_status = C_OK;
+ }
return C_OK;
}
@@ -451,10 +461,11 @@ void flushAppendOnlyFile(int force) {
/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
- /* We can't recover when the fsync policy is ALWAYS since the
- * reply for the client is already in the output buffers, and we
- * have the contract with the user that on acknowledged write data
- * is synced on disk. */
+ /* We can't recover when the fsync policy is ALWAYS since the reply
+ * for the client is already in the output buffers (both writes and
+ * reads), and the changes to the db can't be rolled back. Since we
+ * have a contract with the user that on acknowledged or observed
+ * writes are is synced on disk, we must exit. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
@@ -502,7 +513,14 @@ try_fsync:
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
- redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
+ /* Let's try to get this data on the disk. To guarantee data safe when
+ * the AOF fsync policy is 'always', we should exit if failed to fsync
+ * AOF (see comment next to the exit(1) after write error above). */
+ if (redis_fsync(server.aof_fd) == -1) {
+ serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "
+ "AOF fsync policy is 'always': %s. Exiting...", strerror(errno));
+ exit(1);
+ }
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
@@ -581,8 +599,6 @@ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, r
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
- robj *tmpargv[3];
-
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
@@ -598,32 +614,31 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
- } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
- /* Translate SETEX/PSETEX to SET and PEXPIREAT */
- tmpargv[0] = createStringObject("SET",3);
- tmpargv[1] = argv[1];
- tmpargv[2] = argv[3];
- buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
- decrRefCount(tmpargv[0]);
- buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setCommand && argc > 3) {
- int i;
- robj *exarg = NULL, *pxarg = NULL;
- for (i = 3; i < argc; i ++) {
- if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
- if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
+ robj *pxarg = NULL;
+ /* When SET is used with EX/PX argument setGenericCommand propagates them with PX millisecond argument.
+ * So since the command arguments are re-written there, we can rely here on the index of PX being 3. */
+ if (!strcasecmp(argv[3]->ptr, "px")) {
+ pxarg = argv[4];
}
- serverAssert(!(exarg && pxarg));
-
- if (exarg || pxarg) {
- /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
- buf = catAppendOnlyGenericCommand(buf,3,argv);
- if (exarg)
- buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
- exarg);
- if (pxarg)
- buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
- pxarg);
+ /* For AOF we convert SET key value relative time in milliseconds to SET key value absolute time in
+ * millisecond. Whenever the condition is true it implies that original SET has been transformed
+ * to SET PX with millisecond time argument so we do not need to worry about unit here.*/
+ if (pxarg) {
+ robj *millisecond = getDecodedObject(pxarg);
+ long long when = strtoll(millisecond->ptr,NULL,10);
+ when += mstime();
+
+ decrRefCount(millisecond);
+
+ robj *newargs[5];
+ newargs[0] = argv[0];
+ newargs[1] = argv[1];
+ newargs[2] = argv[2];
+ newargs[3] = shared.pxat;
+ newargs[4] = createStringObjectFromLongLong(when);
+ buf = catAppendOnlyGenericCommand(buf,5,newargs);
+ decrRefCount(newargs[4]);
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
@@ -1852,6 +1867,20 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
+
+ if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {
+ aof_background_fsync(newfd);
+ } else if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
+ latencyStartMonitor(latency);
+ if (redis_fsync(newfd) == -1) {
+ serverLog(LL_WARNING,
+ "Error trying to fsync the parent diff to the rewritten AOF: %s", strerror(errno));
+ close(newfd);
+ goto cleanup;
+ }
+ latencyEndMonitor(latency);
+ latencyAddSampleIfNeeded("aof-rewrite-done-fsync",latency);
+ }
serverLog(LL_NOTICE,
"Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
@@ -1919,14 +1948,11 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
/* AOF enabled, replace the old fd with the new one. */
oldfd = server.aof_fd;
server.aof_fd = newfd;
- if (server.aof_fsync == AOF_FSYNC_ALWAYS)
- redis_fsync(newfd);
- else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
- aof_background_fsync(newfd);
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
+ server.aof_last_fsync = server.unixtime;
/* Clear regular AOF buffer since its contents was just written to
* the new AOF from the background rewrite buffer. */