summaryrefslogtreecommitdiff
path: root/src/aof.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aof.c')
-rw-r--r--src/aof.c29
1 files changed, 26 insertions, 3 deletions
diff --git a/src/aof.c b/src/aof.c
index e7537742d..12bd74376 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -920,12 +920,12 @@ int aofFsyncInProgress(void) {
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) {
- bioCreateFsyncJob(fd);
+ bioCreateFsyncJob(fd, server.master_repl_offset, 1);
}
/* Close the fd on the basis of aof_background_fsync. */
void aof_background_fsync_and_close(int fd) {
- bioCreateCloseJob(fd, 1, 1);
+ bioCreateCloseAofJob(fd, server.master_repl_offset, 1);
}
/* Kills an AOFRW child process if exists */
@@ -962,6 +962,8 @@ void stopAppendOnly(void) {
server.aof_state = AOF_OFF;
server.aof_rewrite_scheduled = 0;
server.aof_last_incr_size = 0;
+ server.fsynced_reploff = -1;
+ atomicSet(server.fsynced_reploff_pending, 0);
killAppendOnlyChild();
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
@@ -972,6 +974,18 @@ void stopAppendOnly(void) {
int startAppendOnly(void) {
serverAssert(server.aof_state == AOF_OFF);
+ /* Wait for all bio jobs related to AOF to drain. This prevents a race
+ * between updates to `fsynced_reploff_pending` of the worker thread, belonging
+ * to the previous AOF, and the new one. This concern is specific for a full
+ * sync scenario where we don't wanna risk the ACKed replication offset
+ * jumping backwards or forward when switching to a different master. */
+ bioDrainWorker(BIO_AOF_FSYNC);
+
+ /* Set the initial repl_offset, which will be applied to fsynced_reploff
+ * when AOFRW finishes (after possibly being updated by a bio thread) */
+ atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
+ server.fsynced_reploff = 0;
+
server.aof_state = AOF_WAIT_REWRITE;
if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) {
server.aof_rewrite_scheduled = 1;
@@ -1241,6 +1255,7 @@ try_fsync:
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
+ atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
@@ -2669,9 +2684,17 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed */
- if (server.aof_state == AOF_WAIT_REWRITE)
+ if (server.aof_state == AOF_WAIT_REWRITE) {
server.aof_state = AOF_ON;
+ /* Update the fsynced replication offset that just now become valid.
+ * This could either be the one we took in startAppendOnly, or a
+ * newer one set by the bio thread. */
+ long long fsynced_reploff_pending;
+ atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending);
+ server.fsynced_reploff = fsynced_reploff_pending;
+ }
+
serverLog(LL_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {