summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-02-28 12:25:59 +0100
committerantirez <antirez@gmail.com>2018-02-28 12:25:59 +0100
commitd69fc0d76a8be9389728588c11bf06143c27528a (patch)
treeb4e68fa7c025f70741dbebc362aa46f2d8152eeb
parent913d600033702fb146e13a627c0368ab2fb8737f (diff)
downloadredis-d69fc0d76a8be9389728588c11bf06143c27528a.tar.gz
WAIT AOF WIP #2. Feature mostly implemented.
-rw-r--r--src/aof.c13
-rw-r--r--src/blocked.c4
-rw-r--r--src/networking.c1
-rw-r--r--src/replication.c79
-rw-r--r--src/server.c6
-rw-r--r--src/server.h14
6 files changed, 84 insertions, 33 deletions
diff --git a/src/aof.c b/src/aof.c
index 9fa98e9d3..3f7d87ad4 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -211,13 +211,18 @@ void aofStartBackgroundFsync(void) {
return;
}
- /* No fsync is in progress. If there was one, the new epoch, now that it
- * termianted, is stored in server.aof_fsync_in_progress_epoch. So
- * update the current fsync epoch. */
+ /* Before starting a new fsync, we need to flush the AOF buffer to
+ * disk if there are clients blocked in WAIT AOF, otherwise we are
+ * not going to sync their data. */
+ if (server.blocked_clients_by_type[BLOCKED_AOF])
+ flushAppendOnlyFile(0);
+
+ /* No fsync is in progress. If there was one, the new epoch is stored
+ * in server.aof_fsync_in_progress_epoch. So update the current fsync
+ * epoch with the one of the fsync in progress. */
server.aof_fsync_epoch = server.aof_fsync_in_progress_epoch;
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)server.aof_fd,NULL,NULL);
server.aof_fsync_in_progress_epoch++;
- handleClientsBlockedForAOF(); /* Unblock clients if we can. */
}
/* Returns an AOF epoch so that, when such epoch is reached by
diff --git a/src/blocked.c b/src/blocked.c
index f438c3353..5eb608fba 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -139,6 +139,8 @@ void unblockClient(client *c) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
+ } else if (c->btype == BLOCKED_AOF) {
+ unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
unblockClientFromModule(c);
} else {
@@ -166,6 +168,8 @@ void replyToBlockedClientTimedOut(client *c) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
+ } else if (c->btype == BLOCKED_AOF) {
+ addReply(c,shared.czero);
} else if (c->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
diff --git a/src/networking.c b/src/networking.c
index 1c0ac7ff5..d1c1d2869 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -138,6 +138,7 @@ client *createClient(int fd) {
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
c->bpop.numreplicas = 0;
+ c->bpop.aofepoch = 0;
c->bpop.reploffset = 0;
c->woff = 0;
c->watched_keys = listCreate();
diff --git a/src/replication.c b/src/replication.c
index 8c01bfb51..33e220766 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -2394,15 +2394,22 @@ void waitCommand(client *c) {
mstime_t timeout;
long numreplicas, ackreplicas;
long long offset = c->woff;
+ int waitaof = 0; /* True if the user requested to wait for AOF sync. */
if (server.masterhost) {
addReplyError(c,"WAIT cannot be used with slave instances. Please also note that since Redis 4.0 if a slave is configured to be writable (which is not the default) writes to slaves are just local and are not propagated.");
return;
}
- /* Argument parsing. */
- if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
- return;
+ /* AOF or number of replicas argument parsing. */
+ if (!strcasecmp(c->argv[1]->ptr,"AOF")) {
+ waitaof = 1;
+ } else {
+ if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
+ return;
+ }
+
+ /* Timeout parsing. */
if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
!= C_OK) return;
@@ -2415,11 +2422,17 @@ void waitCommand(client *c) {
/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
- c->bpop.timeout = timeout;
- c->bpop.reploffset = offset;
- c->bpop.numreplicas = numreplicas;
- listAddNodeTail(server.clients_waiting_acks,c);
- blockClient(c,BLOCKED_WAIT);
+ if (waitaof) {
+ c->bpop.aofepoch = aofNextEpoch();
+ listAddNodeTail(server.clients_waiting_acks,c);
+ blockClient(c,BLOCKED_AOF);
+ } else {
+ c->bpop.timeout = timeout;
+ c->bpop.reploffset = offset;
+ c->bpop.numreplicas = numreplicas;
+ listAddNodeTail(server.clients_waiting_acks,c);
+ blockClient(c,BLOCKED_WAIT);
+ }
/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
@@ -2438,7 +2451,7 @@ void unblockClientWaitingReplicas(client *c) {
/* Check if there are clients blocked in WAIT that can be unblocked since
* we received enough ACKs from slaves. */
-void processClientsWaitingReplicas(void) {
+void processClientsBlockedInWait(void) {
long long last_offset = 0;
int last_numreplicas = 0;
@@ -2449,26 +2462,44 @@ void processClientsWaitingReplicas(void) {
while((ln = listNext(&li))) {
client *c = ln->value;
- /* Every time we find a client that is satisfied for a given
- * offset and number of replicas, we remember it so the next client
- * may be unblocked without calling replicationCountAcksByOffset()
- * if the requested offset / replicas were equal or less. */
- if (last_offset && last_offset > c->bpop.reploffset &&
- last_numreplicas > c->bpop.numreplicas)
- {
- unblockClient(c);
- addReplyLongLong(c,last_numreplicas);
+ if (c->btype == BLOCKED_AOF) {
+ /* Handle WAIT AOF. */
+ if (server.aof_fsync == AOF_FSYNC_ALWAYS ||
+ c->bpop.aofepoch <= server.aof_fsync_epoch)
+ {
+ unblockClient(c);
+ addReply(c,shared.cone);
+ }
} else {
- int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
-
- if (numreplicas >= c->bpop.numreplicas) {
- last_offset = c->bpop.reploffset;
- last_numreplicas = numreplicas;
+ /* Handle WAIT for slaves to ACK.
+ *
+ * Every time we find a client that is satisfied for a given
+ * offset and number of replicas, we remember it so the next client
+ * may be unblocked without calling replicationCountAcksByOffset()
+ * if the requested offset / replicas were equal or less. */
+ if (last_offset && last_offset > c->bpop.reploffset &&
+ last_numreplicas > c->bpop.numreplicas)
+ {
unblockClient(c);
- addReplyLongLong(c,numreplicas);
+ addReplyLongLong(c,last_numreplicas);
+ } else {
+ int numreplicas =
+ replicationCountAcksByOffset(c->bpop.reploffset);
+
+ if (numreplicas >= c->bpop.numreplicas) {
+ last_offset = c->bpop.reploffset;
+ last_numreplicas = numreplicas;
+ unblockClient(c);
+ addReplyLongLong(c,numreplicas);
+ }
}
}
}
+
+ /* If after this cycle we have still clients blocked, try to start
+ * a new AOF fsync. If one is already in progress nothig will happen. */
+ if (server.blocked_clients_by_type[BLOCKED_AOF])
+ aofStartBackgroundFsync();
}
/* Return the slave replication offset for this instance, that is
diff --git a/src/server.c b/src/server.c
index 1a6f30381..6bd19f578 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1229,9 +1229,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
}
/* Unblock all the clients blocked for synchronous replication
- * in WAIT. */
+ * or AOF sync in WAIT. */
if (listLength(server.clients_waiting_acks))
- processClientsWaitingReplicas();
+ processClientsBlockedInWait();
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
@@ -1418,6 +1418,8 @@ void initServerConfig(void) {
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
+ server.aof_fsync_in_progress_epoch = 0;
+ server.aof_fsync_epoch = 0;
server.pidfile = NULL;
server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
diff --git a/src/server.h b/src/server.h
index 29919f5ee..ad4534b68 100644
--- a/src/server.h
+++ b/src/server.h
@@ -258,7 +258,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
#define BLOCKED_STREAM 4 /* XREAD. */
-#define BLOCKED_NUM 5 /* Number of blocked states. */
+#define BLOCKED_AOF 5 /* WAIT AOF waiting for AOF sync. */
+#define BLOCKED_NUM 6 /* Number of blocked states. */
/* Client request types */
#define PROTO_REQ_INLINE 1
@@ -660,6 +661,9 @@ typedef struct blockingState {
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
+ /* BLOCKED_AOF */
+ uint64_t aofepoch; /* WAIT AOF: the sync epoch we are waiting for. */
+
/* BLOCKED_MODULE */
void *module_blocked_handle; /* RedisModuleBlockedClient structure.
which is opaque for the Redis core, only
@@ -1016,7 +1020,7 @@ struct redisServer {
int aof_fd; /* File descriptor of currently selected AOF file */
int aof_selected_db; /* Currently selected DB in AOF */
time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
- time_t aof_last_fsync; /* UNIX time of last fsync() */
+ time_t aof_last_fsync; /* UNIX time of last fsync() *attempt* */
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
int aof_lastbgrewrite_status; /* C_OK or C_ERR */
@@ -1026,6 +1030,8 @@ struct redisServer {
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
+ uint64_t aof_fsync_epoch; /* AOF sync epoch used for WAIT AOF. */
+ uint64_t aof_fsync_in_progress_epoch; /* Current ongoing AOF sync epoch. */
/* AOF pipes used to communicate between parent and child during rewrite. */
int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent;
@@ -1513,7 +1519,7 @@ void replicationScriptCacheInit(void);
void replicationScriptCacheFlush(void);
void replicationScriptCacheAdd(sds sha1);
int replicationScriptCacheExists(sds sha1);
-void processClientsWaitingReplicas(void);
+void processClientsBlockedInWait(void);
void unblockClientWaitingReplicas(client *c);
int replicationCountAcksByOffset(long long offset);
void replicationSendNewlineToMaster(void);
@@ -1548,6 +1554,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal);
void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void);
+void aofStartBackgroundFsync(void);
+uint64_t aofNextEpoch(void);
/* Child info */
void openChildInfoPipe(void);