From d69fc0d76a8be9389728588c11bf06143c27528a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 28 Feb 2018 12:25:59 +0100 Subject: WAIT AOF WIP #2. Feature mostly implemented. --- src/aof.c | 13 ++++++--- src/blocked.c | 4 +++ src/networking.c | 1 + src/replication.c | 79 ++++++++++++++++++++++++++++++++++++++----------------- src/server.c | 6 +++-- src/server.h | 14 +++++++--- 6 files changed, 84 insertions(+), 33 deletions(-) diff --git a/src/aof.c b/src/aof.c index 9fa98e9d3..3f7d87ad4 100644 --- a/src/aof.c +++ b/src/aof.c @@ -211,13 +211,18 @@ void aofStartBackgroundFsync(void) { return; } - /* No fsync is in progress. If there was one, the new epoch, now that it - * termianted, is stored in server.aof_fsync_in_progress_epoch. So - * update the current fsync epoch. */ + /* Before starting a new fsync, we need to flush the AOF buffer to + * disk if there are clients blocked in WAIT AOF, otherwise we are + * not going to sync their data. */ + if (server.blocked_clients_by_type[BLOCKED_AOF]) + flushAppendOnlyFile(0); + + /* No fsync is in progress. If there was one, the new epoch is stored + * in server.aof_fsync_in_progress_epoch. So update the current fsync + * epoch with the one of the fsync in progress. */ server.aof_fsync_epoch = server.aof_fsync_in_progress_epoch; bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)server.aof_fd,NULL,NULL); server.aof_fsync_in_progress_epoch++; - handleClientsBlockedForAOF(); /* Unblock clients if we can. */ } /* Returns an AOF epoch so that, when such epoch is reached by diff --git a/src/blocked.c b/src/blocked.c index f438c3353..5eb608fba 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -139,6 +139,8 @@ void unblockClient(client *c) { unblockClientWaitingData(c); } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); + } else if (c->btype == BLOCKED_AOF) { + unblockClientWaitingReplicas(c); } else if (c->btype == BLOCKED_MODULE) { unblockClientFromModule(c); } else { @@ -166,6 +168,8 @@ void replyToBlockedClientTimedOut(client *c) { addReply(c,shared.nullmultibulk); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); + } else if (c->btype == BLOCKED_AOF) { + addReply(c,shared.czero); } else if (c->btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c); } else { diff --git a/src/networking.c b/src/networking.c index 1c0ac7ff5..d1c1d2869 100644 --- a/src/networking.c +++ b/src/networking.c @@ -138,6 +138,7 @@ client *createClient(int fd) { c->bpop.target = NULL; c->bpop.xread_group = NULL; c->bpop.numreplicas = 0; + c->bpop.aofepoch = 0; c->bpop.reploffset = 0; c->woff = 0; c->watched_keys = listCreate(); diff --git a/src/replication.c b/src/replication.c index 8c01bfb51..33e220766 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2394,15 +2394,22 @@ void waitCommand(client *c) { mstime_t timeout; long numreplicas, ackreplicas; long long offset = c->woff; + int waitaof = 0; /* True if the user requested to wait for AOF sync. */ if (server.masterhost) { addReplyError(c,"WAIT cannot be used with slave instances. Please also note that since Redis 4.0 if a slave is configured to be writable (which is not the default) writes to slaves are just local and are not propagated."); return; } - /* Argument parsing. */ - if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK) - return; + /* AOF or number of replicas argument parsing. */ + if (!strcasecmp(c->argv[1]->ptr,"AOF")) { + waitaof = 1; + } else { + if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK) + return; + } + + /* Timeout parsing. */ if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) != C_OK) return; @@ -2415,11 +2422,17 @@ void waitCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from slaves. */ - c->bpop.timeout = timeout; - c->bpop.reploffset = offset; - c->bpop.numreplicas = numreplicas; - listAddNodeTail(server.clients_waiting_acks,c); - blockClient(c,BLOCKED_WAIT); + if (waitaof) { + c->bpop.aofepoch = aofNextEpoch(); + listAddNodeTail(server.clients_waiting_acks,c); + blockClient(c,BLOCKED_AOF); + } else { + c->bpop.timeout = timeout; + c->bpop.reploffset = offset; + c->bpop.numreplicas = numreplicas; + listAddNodeTail(server.clients_waiting_acks,c); + blockClient(c,BLOCKED_WAIT); + } /* Make sure that the server will send an ACK request to all the slaves * before returning to the event loop. */ @@ -2438,7 +2451,7 @@ void unblockClientWaitingReplicas(client *c) { /* Check if there are clients blocked in WAIT that can be unblocked since * we received enough ACKs from slaves. */ -void processClientsWaitingReplicas(void) { +void processClientsBlockedInWait(void) { long long last_offset = 0; int last_numreplicas = 0; @@ -2449,26 +2462,44 @@ void processClientsWaitingReplicas(void) { while((ln = listNext(&li))) { client *c = ln->value; - /* Every time we find a client that is satisfied for a given - * offset and number of replicas, we remember it so the next client - * may be unblocked without calling replicationCountAcksByOffset() - * if the requested offset / replicas were equal or less. */ - if (last_offset && last_offset > c->bpop.reploffset && - last_numreplicas > c->bpop.numreplicas) - { - unblockClient(c); - addReplyLongLong(c,last_numreplicas); + if (c->btype == BLOCKED_AOF) { + /* Handle WAIT AOF. */ + if (server.aof_fsync == AOF_FSYNC_ALWAYS || + c->bpop.aofepoch <= server.aof_fsync_epoch) + { + unblockClient(c); + addReply(c,shared.cone); + } } else { - int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); - - if (numreplicas >= c->bpop.numreplicas) { - last_offset = c->bpop.reploffset; - last_numreplicas = numreplicas; + /* Handle WAIT for slaves to ACK. + * + * Every time we find a client that is satisfied for a given + * offset and number of replicas, we remember it so the next client + * may be unblocked without calling replicationCountAcksByOffset() + * if the requested offset / replicas were equal or less. */ + if (last_offset && last_offset > c->bpop.reploffset && + last_numreplicas > c->bpop.numreplicas) + { unblockClient(c); - addReplyLongLong(c,numreplicas); + addReplyLongLong(c,last_numreplicas); + } else { + int numreplicas = + replicationCountAcksByOffset(c->bpop.reploffset); + + if (numreplicas >= c->bpop.numreplicas) { + last_offset = c->bpop.reploffset; + last_numreplicas = numreplicas; + unblockClient(c); + addReplyLongLong(c,numreplicas); + } } } } + + /* If after this cycle we have still clients blocked, try to start + * a new AOF fsync. If one is already in progress nothig will happen. */ + if (server.blocked_clients_by_type[BLOCKED_AOF]) + aofStartBackgroundFsync(); } /* Return the slave replication offset for this instance, that is diff --git a/src/server.c b/src/server.c index 1a6f30381..6bd19f578 100644 --- a/src/server.c +++ b/src/server.c @@ -1229,9 +1229,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { } /* Unblock all the clients blocked for synchronous replication - * in WAIT. */ + * or AOF sync in WAIT. */ if (listLength(server.clients_waiting_acks)) - processClientsWaitingReplicas(); + processClientsBlockedInWait(); /* Check if there are clients unblocked by modules that implement * blocking commands. */ @@ -1418,6 +1418,8 @@ void initServerConfig(void) { server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; + server.aof_fsync_in_progress_epoch = 0; + server.aof_fsync_epoch = 0; server.pidfile = NULL; server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); diff --git a/src/server.h b/src/server.h index 29919f5ee..ad4534b68 100644 --- a/src/server.h +++ b/src/server.h @@ -258,7 +258,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ #define BLOCKED_STREAM 4 /* XREAD. */ -#define BLOCKED_NUM 5 /* Number of blocked states. */ +#define BLOCKED_AOF 5 /* WAIT AOF waiting for AOF sync. */ +#define BLOCKED_NUM 6 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -660,6 +661,9 @@ typedef struct blockingState { int numreplicas; /* Number of replicas we are waiting for ACK. */ long long reploffset; /* Replication offset to reach. */ + /* BLOCKED_AOF */ + uint64_t aofepoch; /* WAIT AOF: the sync epoch we are waiting for. */ + /* BLOCKED_MODULE */ void *module_blocked_handle; /* RedisModuleBlockedClient structure. which is opaque for the Redis core, only @@ -1016,7 +1020,7 @@ struct redisServer { int aof_fd; /* File descriptor of currently selected AOF file */ int aof_selected_db; /* Currently selected DB in AOF */ time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ - time_t aof_last_fsync; /* UNIX time of last fsync() */ + time_t aof_last_fsync; /* UNIX time of last fsync() *attempt* */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ int aof_lastbgrewrite_status; /* C_OK or C_ERR */ @@ -1026,6 +1030,8 @@ struct redisServer { int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */ + uint64_t aof_fsync_epoch; /* AOF sync epoch used for WAIT AOF. */ + uint64_t aof_fsync_in_progress_epoch; /* Current ongoing AOF sync epoch. */ /* AOF pipes used to communicate between parent and child during rewrite. */ int aof_pipe_write_data_to_child; int aof_pipe_read_data_from_parent; @@ -1513,7 +1519,7 @@ void replicationScriptCacheInit(void); void replicationScriptCacheFlush(void); void replicationScriptCacheAdd(sds sha1); int replicationScriptCacheExists(sds sha1); -void processClientsWaitingReplicas(void); +void processClientsBlockedInWait(void); void unblockClientWaitingReplicas(client *c); int replicationCountAcksByOffset(long long offset); void replicationSendNewlineToMaster(void); @@ -1548,6 +1554,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal); void aofRewriteBufferReset(void); unsigned long aofRewriteBufferSize(void); ssize_t aofReadDiffFromParent(void); +void aofStartBackgroundFsync(void); +uint64_t aofNextEpoch(void); /* Child info */ void openChildInfoPipe(void); -- cgit v1.2.1