diff options
Diffstat (limited to 'src/aof.c')
-rw-r--r-- | src/aof.c | 96 |
1 files changed, 61 insertions, 35 deletions
@@ -235,6 +235,8 @@ void stopAppendOnly(void) { serverAssert(server.aof_state != AOF_OFF); flushAppendOnlyFile(1); redis_fsync(server.aof_fd); + server.aof_fsync_offset = server.aof_current_size; + server.aof_last_fsync = server.unixtime; close(server.aof_fd); server.aof_fd = -1; @@ -242,6 +244,8 @@ void stopAppendOnly(void) { server.aof_state = AOF_OFF; server.aof_rewrite_scheduled = 0; killAppendOnlyChild(); + sdsfree(server.aof_buf); + server.aof_buf = sdsempty(); } /* Called when the user switches from "appendonly no" to "appendonly yes" @@ -285,6 +289,12 @@ int startAppendOnly(void) { server.aof_state = AOF_WAIT_REWRITE; server.aof_last_fsync = server.unixtime; server.aof_fd = newfd; + + /* If AOF was in error state, we just ignore it and log the event. */ + if (server.aof_last_write_status == C_ERR) { + serverLog(LL_WARNING,"AOF reopen, just ignore the last error."); + server.aof_last_write_status = C_OK; + } return C_OK; } @@ -451,10 +461,11 @@ void flushAppendOnlyFile(int force) { /* Handle the AOF write error. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { - /* We can't recover when the fsync policy is ALWAYS since the - * reply for the client is already in the output buffers, and we - * have the contract with the user that on acknowledged write data - * is synced on disk. */ + /* We can't recover when the fsync policy is ALWAYS since the reply + * for the client is already in the output buffers (both writes and + * reads), and the changes to the db can't be rolled back. Since we + * have a contract with the user that on acknowledged or observed + * writes are is synced on disk, we must exit. */ serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { @@ -502,7 +513,14 @@ try_fsync: /* redis_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); - redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */ + /* Let's try to get this data on the disk. To guarantee data safe when + * the AOF fsync policy is 'always', we should exit if failed to fsync + * AOF (see comment next to the exit(1) after write error above). */ + if (redis_fsync(server.aof_fd) == -1) { + serverLog(LL_WARNING,"Can't persist AOF for fsync error when the " + "AOF fsync policy is 'always': %s. Exiting...", strerror(errno)); + exit(1); + } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_fsync_offset = server.aof_current_size; @@ -581,8 +599,6 @@ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, r void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); - robj *tmpargv[3]; - /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ if (dictid != server.aof_selected_db) { @@ -598,32 +614,31 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); - } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { - /* Translate SETEX/PSETEX to SET and PEXPIREAT */ - tmpargv[0] = createStringObject("SET",3); - tmpargv[1] = argv[1]; - tmpargv[2] = argv[3]; - buf = catAppendOnlyGenericCommand(buf,3,tmpargv); - decrRefCount(tmpargv[0]); - buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setCommand && argc > 3) { - int i; - robj *exarg = NULL, *pxarg = NULL; - for (i = 3; i < argc; i ++) { - if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1]; - if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1]; + robj *pxarg = NULL; + /* When SET is used with EX/PX argument setGenericCommand propagates them with PX millisecond argument. + * So since the command arguments are re-written there, we can rely here on the index of PX being 3. */ + if (!strcasecmp(argv[3]->ptr, "px")) { + pxarg = argv[4]; } - serverAssert(!(exarg && pxarg)); - - if (exarg || pxarg) { - /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */ - buf = catAppendOnlyGenericCommand(buf,3,argv); - if (exarg) - buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1], - exarg); - if (pxarg) - buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1], - pxarg); + /* For AOF we convert SET key value relative time in milliseconds to SET key value absolute time in + * millisecond. Whenever the condition is true it implies that original SET has been transformed + * to SET PX with millisecond time argument so we do not need to worry about unit here.*/ + if (pxarg) { + robj *millisecond = getDecodedObject(pxarg); + long long when = strtoll(millisecond->ptr,NULL,10); + when += mstime(); + + decrRefCount(millisecond); + + robj *newargs[5]; + newargs[0] = argv[0]; + newargs[1] = argv[1]; + newargs[2] = argv[2]; + newargs[3] = shared.pxat; + newargs[4] = createStringObjectFromLongLong(when); + buf = catAppendOnlyGenericCommand(buf,5,newargs); + decrRefCount(newargs[4]); } else { buf = catAppendOnlyGenericCommand(buf,argc,argv); } @@ -1852,6 +1867,20 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency); + + if (server.aof_fsync == AOF_FSYNC_EVERYSEC) { + aof_background_fsync(newfd); + } else if (server.aof_fsync == AOF_FSYNC_ALWAYS) { + latencyStartMonitor(latency); + if (redis_fsync(newfd) == -1) { + serverLog(LL_WARNING, + "Error trying to fsync the parent diff to the rewritten AOF: %s", strerror(errno)); + close(newfd); + goto cleanup; + } + latencyEndMonitor(latency); + latencyAddSampleIfNeeded("aof-rewrite-done-fsync",latency); + } serverLog(LL_NOTICE, "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024)); @@ -1919,14 +1948,11 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* AOF enabled, replace the old fd with the new one. */ oldfd = server.aof_fd; server.aof_fd = newfd; - if (server.aof_fsync == AOF_FSYNC_ALWAYS) - redis_fsync(newfd); - else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) - aof_background_fsync(newfd); server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; server.aof_fsync_offset = server.aof_current_size; + server.aof_last_fsync = server.unixtime; /* Clear regular AOF buffer since its contents was just written to * the new AOF from the background rewrite buffer. */ |