summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSlava Koyfman <slavak@users.noreply.github.com>2023-03-14 20:26:21 +0200
committerGitHub <noreply@github.com>2023-03-14 20:26:21 +0200
commit9344f654c6227f30adf8bafd49b4218357c438b7 (patch)
tree89dd758fad385c62ccb166d7f8971eb93589a215
parent7997874f4d2be9da1e26c77804569b0057c1e0a2 (diff)
downloadredis-9344f654c6227f30adf8bafd49b4218357c438b7.tar.gz
Implementing the WAITAOF command (issue #10505) (#11713)
Implementing the WAITAOF functionality which would allow the user to block until a specified number of Redises have fsynced all previous write commands to the AOF. Syntax: `WAITAOF <num_local> <num_replicas> <timeout>` Response: Array containing two elements: num_local, num_replicas num_local is always either 0 or 1 representing the local AOF on the master. num_replicas is the number of replicas that acknowledged the a replication offset of the last write being fsynced to the AOF. Returns an error when called on replicas, or when called with non-zero num_local on a master with AOF disabled, in all other cases the response just contains number of fsync copies. Main changes: * Added code to keep track of replication offsets that are confirmed to have been fsynced to disk. * Keep advancing master_repl_offset even when replication is disabled (and there's no replication backlog, only if there's an AOF enabled). This way we can use this command and it's mechanisms even when replication is disabled. * Extend REPLCONF ACK to `REPLCONF ACK <ofs> FACK <ofs>`, the FACK will be appended only if there's an AOF on the replica, and already ignored on old masters (thus backwards compatible) * WAIT now no longer wait for the replication offset after your last command, but rather the replication offset after your last write (or read command that caused propagation, e.g. lazy expiry). Unrelated changes: * WAIT command respects CLIENT_DENY_BLOCKING (not just CLIENT_MULTI) Implementation details: * Add an atomic var named `fsynced_reploff_pending` that's updated (usually by the bio thread) and later copied to the main `fsynced_reploff` variable (only if the AOF base file exists). I.e. during the initial AOF rewrite it will not be used as the fsynced offset since the AOF base is still missing. * Replace close+fsync bio job with new BIO_CLOSE_AOF (AOF specific) job that will also update fsync offset the field. * Handle all AOF jobs (BIO_CLOSE_AOF, BIO_AOF_FSYNC) in the same bio worker thread, to impose ordering on their execution. This solves a race condition where a job could set `fsynced_reploff_pending` to a higher value than another pending fsync job, resulting in indicating an offset for which parts of the data have not yet actually been fsynced. Imposing an ordering on the jobs guarantees that fsync jobs are executed in increasing order of replication offset. * Drain bio jobs when switching `appendfsync` to "always" This should prevent a write race between updates to `fsynced_reploff_pending` in the main thread (`flushAppendOnlyFile` when set to ALWAYS fsync), and those done in the bio thread. * Drain the pending fsync when starting over a new AOF to avoid race conditions with the previous AOF offsets overriding the new one (e.g. after switching to replicate from a new master). * Make sure to update the fsynced offset at the end of the initial AOF rewrite. a must in case there are no additional writes that trigger a periodic fsync, specifically for a replica that does a full sync. Limitations: It is possible to write a module and a Lua script that propagate to the AOF and doesn't propagate to the replication stream. see REDISMODULE_ARGV_NO_REPLICAS and luaRedisSetReplCommand. These features are incompatible with the WAITAOF command, and can result in two bad cases. The scenario is that the user executes command that only propagates to AOF, and then immediately issues a WAITAOF, and there's no further writes on the replication stream after that. 1. if the the last thing that happened on the replication stream is a PING (which increased the replication offset but won't trigger an fsync on the replica), then the client would hang forever (will wait for an fack that the replica will never send sine it doesn't trigger any fsyncs). 2. if the last thing that happened is a write command that got propagated properly, then WAITAOF will be released immediately, without waiting for an fsync (since the offset didn't change) Refactoring: * Plumbing to allow bio worker to handle multiple job types This introduces infrastructure necessary to allow BIO workers to not have a 1-1 mapping of worker to job-type. This allows in the future to assign multiple job types to a single worker, either as a performance/resource optimization, or as a way of enforcing ordering between specific classes of jobs. Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r--src/aof.c29
-rw-r--r--src/bio.c170
-rw-r--r--src/bio.h15
-rw-r--r--src/blocked.c16
-rw-r--r--src/commands.c21
-rw-r--r--src/commands/waitaof.json52
-rw-r--r--src/config.c14
-rw-r--r--src/networking.c1
-rw-r--r--src/replication.c119
-rw-r--r--src/server.c20
-rw-r--r--src/server.h13
-rw-r--r--tests/support/util.tcl4
-rw-r--r--tests/unit/wait.tcl231
13 files changed, 623 insertions, 82 deletions
diff --git a/src/aof.c b/src/aof.c
index e7537742d..12bd74376 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -920,12 +920,12 @@ int aofFsyncInProgress(void) {
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) {
- bioCreateFsyncJob(fd);
+ bioCreateFsyncJob(fd, server.master_repl_offset, 1);
}
/* Close the fd on the basis of aof_background_fsync. */
void aof_background_fsync_and_close(int fd) {
- bioCreateCloseJob(fd, 1, 1);
+ bioCreateCloseAofJob(fd, server.master_repl_offset, 1);
}
/* Kills an AOFRW child process if exists */
@@ -962,6 +962,8 @@ void stopAppendOnly(void) {
server.aof_state = AOF_OFF;
server.aof_rewrite_scheduled = 0;
server.aof_last_incr_size = 0;
+ server.fsynced_reploff = -1;
+ atomicSet(server.fsynced_reploff_pending, 0);
killAppendOnlyChild();
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
@@ -972,6 +974,18 @@ void stopAppendOnly(void) {
int startAppendOnly(void) {
serverAssert(server.aof_state == AOF_OFF);
+ /* Wait for all bio jobs related to AOF to drain. This prevents a race
+ * between updates to `fsynced_reploff_pending` of the worker thread, belonging
+ * to the previous AOF, and the new one. This concern is specific for a full
+ * sync scenario where we don't wanna risk the ACKed replication offset
+ * jumping backwards or forward when switching to a different master. */
+ bioDrainWorker(BIO_AOF_FSYNC);
+
+ /* Set the initial repl_offset, which will be applied to fsynced_reploff
+ * when AOFRW finishes (after possibly being updated by a bio thread) */
+ atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
+ server.fsynced_reploff = 0;
+
server.aof_state = AOF_WAIT_REWRITE;
if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) {
server.aof_rewrite_scheduled = 1;
@@ -1241,6 +1255,7 @@ try_fsync:
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
+ atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
@@ -2669,9 +2684,17 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed */
- if (server.aof_state == AOF_WAIT_REWRITE)
+ if (server.aof_state == AOF_WAIT_REWRITE) {
server.aof_state = AOF_ON;
+ /* Update the fsynced replication offset that just now become valid.
+ * This could either be the one we took in startAppendOnly, or a
+ * newer one set by the bio thread. */
+ long long fsynced_reploff_pending;
+ atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending);
+ server.fsynced_reploff = fsynced_reploff_pending;
+ }
+
serverLog(LL_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
diff --git a/src/bio.c b/src/bio.c
index 7eb43a3a3..640fbb024 100644
--- a/src/bio.c
+++ b/src/bio.c
@@ -15,13 +15,15 @@
* DESIGN
* ------
*
- * The design is trivial, we have a structure representing a job to perform
- * and a different thread and job queue for every job type.
- * Every thread waits for new jobs in its queue, and process every job
+ * The design is simple: We have a structure representing a job to perform,
+ * and several worker threads and job queues. Every job type is assigned to
+ * a specific worker thread, and a single worker may handle several different
+ * job types.
+ * Every thread waits for new jobs in its queue, and processes every job
* sequentially.
*
- * Jobs of the same type are guaranteed to be processed from the least
- * recently inserted to the most recently inserted (older jobs processed
+ * Jobs handled by the same worker are guaranteed to be processed from the
+ * least-recently-inserted to the most-recently-inserted (older jobs processed
* first).
*
* Currently there is no way for the creator of the job to be notified about
@@ -61,17 +63,39 @@
#include "server.h"
#include "bio.h"
-static pthread_t bio_threads[BIO_NUM_OPS];
-static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
-static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
-static list *bio_jobs[BIO_NUM_OPS];
+static char* bio_worker_title[] = {
+ "bio_close_file",
+ "bio_aof",
+ "bio_lazy_free",
+};
+
+#define BIO_WORKER_NUM (sizeof(bio_worker_title) / sizeof(*bio_worker_title))
+
+static unsigned int bio_job_to_worker[] = {
+ [BIO_CLOSE_FILE] = 0,
+ [BIO_AOF_FSYNC] = 1,
+ [BIO_CLOSE_AOF] = 1,
+ [BIO_LAZY_FREE] = 2,
+};
+
+static pthread_t bio_threads[BIO_WORKER_NUM];
+static pthread_mutex_t bio_mutex[BIO_WORKER_NUM];
+static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM];
+static list *bio_jobs[BIO_WORKER_NUM];
+static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0};
/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
typedef union bio_job {
+ struct {
+ int type; /* Job-type tag. This needs to appear as the first element in all union members. */
+ } header;
+
/* Job specific arguments.*/
struct {
+ int type;
int fd; /* Fd for file based background jobs */
+ long long offset; /* A job-specific offset, if applicable */
unsigned need_fsync:1; /* A flag to indicate that a fsync is required before
* the file is closed. */
unsigned need_reclaim_cache:1; /* A flag to indicate that reclaim cache is required before
@@ -79,6 +103,7 @@ typedef union bio_job {
} fd_args;
struct {
+ int type;
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
} free_args;
@@ -95,10 +120,10 @@ void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
- int j;
+ unsigned long j;
/* Initialization of state vars and objects */
- for (j = 0; j < BIO_NUM_OPS; j++) {
+ for (j = 0; j < BIO_WORKER_NUM; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
bio_jobs[j] = listCreate();
@@ -113,8 +138,8 @@ void bioInit(void) {
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
- * responsible of. */
- for (j = 0; j < BIO_NUM_OPS; j++) {
+ * responsible for. */
+ for (j = 0; j < BIO_WORKER_NUM; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
@@ -125,10 +150,13 @@ void bioInit(void) {
}
void bioSubmitJob(int type, bio_job *job) {
- pthread_mutex_lock(&bio_mutex[type]);
- listAddNodeTail(bio_jobs[type],job);
- pthread_cond_signal(&bio_newjob_cond[type]);
- pthread_mutex_unlock(&bio_mutex[type]);
+ job->header.type = type;
+ unsigned long worker = bio_job_to_worker[type];
+ pthread_mutex_lock(&bio_mutex[worker]);
+ listAddNodeTail(bio_jobs[worker],job);
+ bio_jobs_counter[type]++;
+ pthread_cond_signal(&bio_newjob_cond[worker]);
+ pthread_mutex_unlock(&bio_mutex[worker]);
}
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
@@ -155,42 +183,40 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) {
bioSubmitJob(BIO_CLOSE_FILE, job);
}
-void bioCreateFsyncJob(int fd) {
+void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache) {
bio_job *job = zmalloc(sizeof(*job));
job->fd_args.fd = fd;
+ job->fd_args.offset = offset;
+ job->fd_args.need_fsync = 1;
+ job->fd_args.need_reclaim_cache = need_reclaim_cache;
+
+ bioSubmitJob(BIO_CLOSE_AOF, job);
+}
+
+void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache) {
+ bio_job *job = zmalloc(sizeof(*job));
+ job->fd_args.fd = fd;
+ job->fd_args.offset = offset;
+ job->fd_args.need_reclaim_cache = need_reclaim_cache;
bioSubmitJob(BIO_AOF_FSYNC, job);
}
void *bioProcessBackgroundJobs(void *arg) {
bio_job *job;
- unsigned long type = (unsigned long) arg;
+ unsigned long worker = (unsigned long) arg;
sigset_t sigset;
- /* Check that the type is within the right interval. */
- if (type >= BIO_NUM_OPS) {
- serverLog(LL_WARNING,
- "Warning: bio thread started with wrong type %lu",type);
- return NULL;
- }
+ /* Check that the worker is within the right interval. */
+ serverAssert(worker < BIO_WORKER_NUM);
- switch (type) {
- case BIO_CLOSE_FILE:
- redis_set_thread_title("bio_close_file");
- break;
- case BIO_AOF_FSYNC:
- redis_set_thread_title("bio_aof_fsync");
- break;
- case BIO_LAZY_FREE:
- redis_set_thread_title("bio_lazy_free");
- break;
- }
+ redis_set_thread_title(bio_worker_title[worker]);
redisSetCpuAffinity(server.bio_cpulist);
makeThreadKillable();
- pthread_mutex_lock(&bio_mutex[type]);
+ pthread_mutex_lock(&bio_mutex[worker]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
@@ -203,21 +229,26 @@ void *bioProcessBackgroundJobs(void *arg) {
listNode *ln;
/* The loop always starts with the lock hold. */
- if (listLength(bio_jobs[type]) == 0) {
- pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
+ if (listLength(bio_jobs[worker]) == 0) {
+ pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]);
continue;
}
- /* Pop the job from the queue. */
- ln = listFirst(bio_jobs[type]);
+ /* Get the job from the queue. */
+ ln = listFirst(bio_jobs[worker]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
- pthread_mutex_unlock(&bio_mutex[type]);
+ pthread_mutex_unlock(&bio_mutex[worker]);
/* Process the job accordingly to its type. */
- if (type == BIO_CLOSE_FILE) {
- if (job->fd_args.need_fsync) {
- redis_fsync(job->fd_args.fd);
+ int job_type = job->header.type;
+
+ if (job_type == BIO_CLOSE_FILE) {
+ if (job->fd_args.need_fsync &&
+ redis_fsync(job->fd_args.fd) == -1 &&
+ errno != EBADF && errno != EINVAL)
+ {
+ serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno));
}
if (job->fd_args.need_reclaim_cache) {
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
@@ -225,7 +256,7 @@ void *bioProcessBackgroundJobs(void *arg) {
}
}
close(job->fd_args.fd);
- } else if (type == BIO_AOF_FSYNC) {
+ } else if (job_type == BIO_AOF_FSYNC || job_type == BIO_CLOSE_AOF) {
/* The fd may be closed by main thread and reused for another
* socket, pipe, or file. We just ignore these errno because
* aof fsync did not really fail. */
@@ -242,8 +273,17 @@ void *bioProcessBackgroundJobs(void *arg) {
}
} else {
atomicSet(server.aof_bio_fsync_status,C_OK);
+ atomicSet(server.fsynced_reploff_pending, job->fd_args.offset);
}
- } else if (type == BIO_LAZY_FREE) {
+
+ if (job->fd_args.need_reclaim_cache) {
+ if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
+ serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno));
+ }
+ }
+ if (job_type == BIO_CLOSE_AOF)
+ close(job->fd_args.fd);
+ } else if (job_type == BIO_LAZY_FREE) {
job->free_args.free_fn(job->free_args.free_args);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
@@ -252,37 +292,53 @@ void *bioProcessBackgroundJobs(void *arg) {
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
- pthread_mutex_lock(&bio_mutex[type]);
- listDelNode(bio_jobs[type],ln);
+ pthread_mutex_lock(&bio_mutex[worker]);
+ listDelNode(bio_jobs[worker], ln);
+ bio_jobs_counter[job_type]--;
+ pthread_cond_signal(&bio_newjob_cond[worker]);
}
}
/* Return the number of pending jobs of the specified type. */
unsigned long bioPendingJobsOfType(int type) {
- unsigned long long val;
- pthread_mutex_lock(&bio_mutex[type]);
- val = listLength(bio_jobs[type]);
- pthread_mutex_unlock(&bio_mutex[type]);
+ unsigned int worker = bio_job_to_worker[type];
+
+ pthread_mutex_lock(&bio_mutex[worker]);
+ unsigned long val = bio_jobs_counter[type];
+ pthread_mutex_unlock(&bio_mutex[worker]);
+
return val;
}
+/* Wait for the job queue of the worker for jobs of specified type to become empty. */
+void bioDrainWorker(int job_type) {
+ unsigned long worker = bio_job_to_worker[job_type];
+
+ pthread_mutex_lock(&bio_mutex[worker]);
+ while (listLength(bio_jobs[worker]) > 0) {
+ pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]);
+ }
+ pthread_mutex_unlock(&bio_mutex[worker]);
+}
+
/* Kill the running bio threads in an unclean way. This function should be
* used only when it's critical to stop the threads for some reason.
* Currently Redis does this only on crash (for instance on SIGSEGV) in order
* to perform a fast memory check without other threads messing with memory. */
void bioKillThreads(void) {
- int err, j;
+ int err;
+ unsigned long j;
- for (j = 0; j < BIO_NUM_OPS; j++) {
+ for (j = 0; j < BIO_WORKER_NUM; j++) {
if (bio_threads[j] == pthread_self()) continue;
if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) {
if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
serverLog(LL_WARNING,
- "Bio thread for job type #%d can not be joined: %s",
+ "Bio worker thread #%lu can not be joined: %s",
j, strerror(err));
} else {
serverLog(LL_WARNING,
- "Bio thread for job type #%d terminated",j);
+ "Bio worker thread #%lu terminated",j);
}
}
}
diff --git a/src/bio.h b/src/bio.h
index 52c80fdb6..0d1fe9b4b 100644
--- a/src/bio.h
+++ b/src/bio.h
@@ -35,15 +35,20 @@ typedef void lazy_free_fn(void *args[]);
/* Exported API */
void bioInit(void);
unsigned long bioPendingJobsOfType(int type);
+void bioDrainWorker(int job_type);
void bioKillThreads(void);
void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache);
-void bioCreateFsyncJob(int fd);
+void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
+void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
/* Background job opcodes */
-#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
-#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
-#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
-#define BIO_NUM_OPS 3
+enum {
+ BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */
+ BIO_AOF_FSYNC, /* Deferred AOF fsync. */
+ BIO_LAZY_FREE, /* Deferred objects freeing. */
+ BIO_CLOSE_AOF, /* Deferred close for AOF files. */
+ BIO_NUM_OPS
+};
#endif
diff --git a/src/blocked.c b/src/blocked.c
index 753442c2b..1f5fba01b 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -177,7 +177,7 @@ void unblockClient(client *c) {
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
- } else if (c->bstate.btype == BLOCKED_WAIT) {
+ } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
@@ -225,6 +225,10 @@ void replyToBlockedClientTimedOut(client *c) {
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset));
+ } else if (c->bstate.btype == BLOCKED_WAITAOF) {
+ addReplyArrayLen(c,2);
+ addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset);
+ addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
@@ -583,6 +587,16 @@ void blockForReplication(client *c, mstime_t timeout, long long offset, long num
blockClient(c,BLOCKED_WAIT);
}
+/* block a client due to waitaof command */
+void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
+ c->bstate.timeout = timeout;
+ c->bstate.reploffset = offset;
+ c->bstate.numreplicas = numreplicas;
+ c->bstate.numlocal = numlocal;
+ listAddNodeHead(server.clients_waiting_acks,c);
+ blockClient(c,BLOCKED_WAITAOF);
+}
+
/* Postpone client from executing a command. For example the server might be busy
* requesting to avoid processing clients commands which will be processed later
* when the it is ready to accept them. */
diff --git a/src/commands.c b/src/commands.c
index 5eb422d35..7e2fe6f4e 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -1837,6 +1837,26 @@ struct redisCommandArg WAIT_Args[] = {
{0}
};
+/********** WAITAOF ********************/
+
+/* WAITAOF history */
+#define WAITAOF_History NULL
+
+/* WAITAOF tips */
+const char *WAITAOF_tips[] = {
+"request_policy:all_shards",
+"response_policy:agg_min",
+NULL
+};
+
+/* WAITAOF argument table */
+struct redisCommandArg WAITAOF_Args[] = {
+{"numlocal",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
+{"numreplicas",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
+{"timeout",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
+{0}
+};
+
/********** GEOADD ********************/
/* GEOADD history */
@@ -7244,6 +7264,7 @@ struct redisCommand redisCommandTable[] = {
{"type","Determine the type stored at key","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,TYPE_History,TYPE_tips,typeCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,{{NULL,CMD_KEY_RO,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=TYPE_Args},
{"unlink","Delete a key asynchronously in another thread. Otherwise it is just as DEL, but non blocking.","O(1) for each key removed regardless of its size. Then the command does O(N) work in a different thread in order to reclaim memory, where N is the number of allocations the deleted objects where composed of.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,UNLINK_History,UNLINK_tips,unlinkCommand,-2,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE,{{NULL,CMD_KEY_RM|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=UNLINK_Args},
{"wait","Wait for the synchronous replication of all the write commands sent in the context of the current connection","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,WAIT_History,WAIT_tips,waitCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_CONNECTION,.args=WAIT_Args},
+{"waitaof","Wait for all write commands sent in the context of the current connection to be synced to AOF of local host and/or replicas","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,WAITAOF_History,WAITAOF_tips,waitaofCommand,4,CMD_NOSCRIPT,ACL_CATEGORY_CONNECTION,.args=WAITAOF_Args},
/* geo */
{"geoadd","Add one or more geospatial items in the geospatial index represented using a sorted set","O(log(N)) for each item added, where N is the number of elements in the sorted set.","3.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GEO,GEOADD_History,GEOADD_tips,geoaddCommand,-5,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_GEO,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=GEOADD_Args},
{"geodist","Returns the distance between two members of a geospatial index","O(log(N))","3.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GEO,GEODIST_History,GEODIST_tips,geodistCommand,-4,CMD_READONLY,ACL_CATEGORY_GEO,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=GEODIST_Args},
diff --git a/src/commands/waitaof.json b/src/commands/waitaof.json
new file mode 100644
index 000000000..bea18cae9
--- /dev/null
+++ b/src/commands/waitaof.json
@@ -0,0 +1,52 @@
+{
+ "WAITAOF": {
+ "summary": "Wait for all write commands sent in the context of the current connection to be synced to AOF of local host and/or replicas",
+ "complexity": "O(1)",
+ "group": "generic",
+ "since": "7.2.0",
+ "arity": 4,
+ "function": "waitaofCommand",
+ "command_flags": [
+ "NOSCRIPT"
+ ],
+ "acl_categories": [
+ "CONNECTION"
+ ],
+ "command_tips": [
+ "REQUEST_POLICY:ALL_SHARDS",
+ "RESPONSE_POLICY:AGG_MIN"
+ ],
+ "reply_schema": {
+ "type": "array",
+ "description": "Number of local and remote AOF files in sync.",
+ "minItems": 2,
+ "maxItems": 2,
+ "items": [
+ {
+ "description": "Number of local AOF files.",
+ "type": "integer",
+ "minimum": 0
+ },
+ {
+ "description": "Number of replica AOF files.",
+ "type": "number",
+ "minimum": 0
+ }
+ ]
+ },
+ "arguments": [
+ {
+ "name": "numlocal",
+ "type": "integer"
+ },
+ {
+ "name": "numreplicas",
+ "type": "integer"
+ },
+ {
+ "name": "timeout",
+ "type": "integer"
+ }
+ ]
+ }
+}
diff --git a/src/config.c b/src/config.c
index bd62ca3db..94fc039e2 100644
--- a/src/config.c
+++ b/src/config.c
@@ -31,6 +31,7 @@
#include "server.h"
#include "cluster.h"
#include "connection.h"
+#include "bio.h"
#include <fcntl.h>
#include <sys/stat.h>
@@ -2558,6 +2559,17 @@ int updateRequirePass(const char **err) {
return 1;
}
+int updateAppendFsync(const char **err) {
+ UNUSED(err);
+ if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
+ /* Wait for all bio jobs related to AOF to drain before proceeding. This prevents a race
+ * between updates to `fsynced_reploff_pending` done in the main thread and those done on the
+ * worker thread. */
+ bioDrainWorker(BIO_AOF_FSYNC);
+ }
+ return 1;
+}
+
/* applyBind affects both TCP and TLS (if enabled) together */
static int applyBind(const char **err) {
connListener *tcp_listener = listenerByType(CONN_TYPE_SOCKET);
@@ -3098,7 +3110,7 @@ standardConfig static_configs[] = {
createEnumConfig("repl-diskless-load", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG | DENY_LOADING_CONFIG, repl_diskless_load_enum, server.repl_diskless_load, REPL_DISKLESS_LOAD_DISABLED, NULL, NULL),
createEnumConfig("loglevel", NULL, MODIFIABLE_CONFIG, loglevel_enum, server.verbosity, LL_NOTICE, NULL, NULL),
createEnumConfig("maxmemory-policy", NULL, MODIFIABLE_CONFIG, maxmemory_policy_enum, server.maxmemory_policy, MAXMEMORY_NO_EVICTION, NULL, NULL),
- createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, server.aof_fsync, AOF_FSYNC_EVERYSEC, NULL, NULL),
+ createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, server.aof_fsync, AOF_FSYNC_EVERYSEC, NULL, updateAppendFsync),
createEnumConfig("oom-score-adj", NULL, MODIFIABLE_CONFIG, oom_score_adj_enum, server.oom_score_adj, OOM_SCORE_ADJ_NO, NULL, updateOOMScoreAdj),
createEnumConfig("acl-pubsub-default", NULL, MODIFIABLE_CONFIG, acl_pubsub_default_enum, server.acl_pubsub_default, 0, NULL, NULL),
createEnumConfig("sanitize-dump-payload", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, sanitize_dump_payload_enum, server.sanitize_dump_payload, SANITIZE_DUMP_NO, NULL, NULL),
diff --git a/src/networking.c b/src/networking.c
index 634c1fa89..bc5d8128f 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -180,6 +180,7 @@ client *createClient(connection *conn) {
c->repl_applied = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
+ c->repl_aof_off = 0;
c->repl_last_partial_write = 0;
c->slave_listening_port = 0;
c->slave_addr = NULL;
diff --git a/src/replication.c b/src/replication.c
index 5aef23698..14f32a869 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -451,7 +451,13 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
- if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
+ if (server.repl_backlog == NULL && listLength(slaves) == 0) {
+ /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs
+ * even when there's no replication active. This code will not be reached if AOF
+ * is also disabled. */
+ server.master_repl_offset += 1;
+ return;
+ }
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
@@ -1140,11 +1146,12 @@ void syncCommand(client *c) {
* eof: supports EOF-style RDB transfer for diskless replication.
* psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
- * - ack <offset>
+ * - ack <offset> [fack <aofofs>]
* Replica informs the master the amount of replication stream that it
- * processed so far.
+ * processed so far, and optionally the replication offset fsynced to the AOF file.
+ * This special pattern doesn't reply to the caller.
*
- * - getack
+ * - getack <dummy>
* Unlike other subcommands, this is used by master to get the replication
* offset from a replica.
*
@@ -1201,6 +1208,12 @@ void replconfCommand(client *c) {
return;
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
+ if (c->argc > j+3 && !strcasecmp(c->argv[j+2]->ptr,"fack")) {
+ if ((getLongLongFromObject(c->argv[j+3], &offset) != C_OK))
+ return;
+ if (offset > c->repl_aof_off)
+ c->repl_aof_off = offset;
+ }
c->repl_ack_time = server.unixtime;
/* If this was a diskless replication, we need to really put
* the slave online when the first ACK is received (which
@@ -3248,11 +3261,16 @@ void replicationSendAck(void) {
client *c = server.master;
if (c != NULL) {
+ int send_fack = server.fsynced_reploff != -1;
c->flags |= CLIENT_MASTER_FORCE_REPLY;
- addReplyArrayLen(c,3);
+ addReplyArrayLen(c,send_fack ? 5 : 3);
addReplyBulkCString(c,"REPLCONF");
addReplyBulkCString(c,"ACK");
addReplyBulkLongLong(c,c->reploff);
+ if (send_fack) {
+ addReplyBulkCString(c,"FACK");
+ addReplyBulkLongLong(c,server.fsynced_reploff);
+ }
c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
}
}
@@ -3487,6 +3505,23 @@ int replicationCountAcksByOffset(long long offset) {
return count;
}
+/* Return the number of replicas that already acknowledged the specified
+ * replication offset being AOF fsynced. */
+int replicationCountAOFAcksByOffset(long long offset) {
+ listIter li;
+ listNode *ln;
+ int count = 0;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+
+ if (slave->replstate != SLAVE_STATE_ONLINE) continue;
+ if (slave->repl_aof_off >= offset) count++;
+ }
+ return count;
+}
+
/* WAIT for N replicas to acknowledge the processing of our latest
* write command (and all the previous commands). */
void waitCommand(client *c) {
@@ -3507,7 +3542,7 @@ void waitCommand(client *c) {
/* First try without blocking at all. */
ackreplicas = replicationCountAcksByOffset(c->woff);
- if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
+ if (ackreplicas >= numreplicas || c->flags & CLIENT_DENY_BLOCKING) {
addReplyLongLong(c,ackreplicas);
return;
}
@@ -3521,6 +3556,48 @@ void waitCommand(client *c) {
replicationRequestAckFromSlaves();
}
+/* WAIT for N replicas and / or local master to acknowledge our latest
+ * write command got synced to the disk. */
+void waitaofCommand(client *c) {
+ mstime_t timeout;
+ long numreplicas, numlocal, ackreplicas, acklocal;
+
+ /* Argument parsing. */
+ if (getRangeLongFromObjectOrReply(c,c->argv[1],0,1,&numlocal,NULL) != C_OK)
+ return;
+ if (getPositiveLongFromObjectOrReply(c,c->argv[2],&numreplicas,NULL) != C_OK)
+ return;
+ if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_MILLISECONDS) != C_OK)
+ return;
+
+ if (server.masterhost) {
+ addReplyError(c,"WAITAOF cannot be used with replica instances. Please also note that writes to replicas are just local and are not propagated.");
+ return;
+ }
+ if (numlocal && !server.aof_enabled) {
+ addReplyError(c,"WAITAOF cannot be used when appendonly is disabled");
+ return;
+ }
+
+ /* First try without blocking at all. */
+ ackreplicas = replicationCountAOFAcksByOffset(c->woff);
+ acklocal = server.fsynced_reploff >= c->woff;
+ if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flags & CLIENT_DENY_BLOCKING) {
+ addReplyArrayLen(c,2);
+ addReplyLongLong(c,acklocal);
+ addReplyLongLong(c,ackreplicas);
+ return;
+ }
+
+ /* Otherwise block the client and put it into our list of clients
+ * waiting for ack from slaves. */
+ blockForAofFsync(c,timeout,c->woff,numlocal,numreplicas);
+
+ /* Make sure that the server will send an ACK request to all the slaves
+ * before returning to the event loop. */
+ replicationRequestAckFromSlaves();
+}
+
/* This is called by unblockClient() to perform the blocking op type
* specific cleanup. We just remove the client from the list of clients
* waiting for replica acks. Never call it directly, call unblockClient()
@@ -3531,8 +3608,8 @@ void unblockClientWaitingReplicas(client *c) {
listDelNode(server.clients_waiting_acks,ln);
}
-/* Check if there are clients blocked in WAIT that can be unblocked since
- * we received enough ACKs from slaves. */
+/* Check if there are clients blocked in WAIT or WAITAOF that can be unblocked
+ * since we received enough ACKs from slaves. */
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
int last_numreplicas = 0;
@@ -3543,6 +3620,13 @@ void processClientsWaitingReplicas(void) {
listRewind(server.clients_waiting_acks,&li);
while((ln = listNext(&li))) {
client *c = ln->value;
+ int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
+
+ if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
+ unblockClient(c);
+ addReplyError(c,"WAITAOF cannot be used when appendonly is disabled");
+ return;
+ }
/* Every time we find a client that is satisfied for a given
* offset and number of replicas, we remember it so the next client
@@ -3555,13 +3639,28 @@ void processClientsWaitingReplicas(void) {
addReplyLongLong(c,last_numreplicas);
unblockClient(c);
} else {
- int numreplicas = replicationCountAcksByOffset(c->bstate.reploffset);
+ int numreplicas = is_wait_aof ?
+ replicationCountAOFAcksByOffset(c->bstate.reploffset) :
+ replicationCountAcksByOffset(c->bstate.reploffset);
if (numreplicas >= c->bstate.numreplicas) {
last_offset = c->bstate.reploffset;
last_numreplicas = numreplicas;
+
+ /* Check if the local constraint of WAITAOF is served */
+ int numlocal = server.fsynced_reploff >= c->bstate.reploffset;
+ if (is_wait_aof && numlocal < c->bstate.numlocal)
+ continue;
+
+ if (is_wait_aof) {
+ /* WAITAOF has an array reply*/
+ addReplyArrayLen(c,2);
+ addReplyLongLong(c,numlocal);
+ addReplyLongLong(c,numreplicas);
+ } else {
+ addReplyLongLong(c,numreplicas);
+ }
/* Reply before unblocking, because unblock client calls reqresAppendResponse */
- addReplyLongLong(c,numreplicas);
unblockClient(c);
}
}
diff --git a/src/server.c b/src/server.c
index b42f22616..47988dec9 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1642,7 +1642,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/* Unblock all the clients blocked for synchronous replication
- * in WAIT. */
+ * in WAIT or WAITAOF. */
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
@@ -1702,6 +1702,15 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
flushAppendOnlyFile(0);
+ /* Update the fsynced replica offset.
+ * If an initial rewrite is in progress then not all data is guaranteed to have actually been
+ * persisted to disk yet, so we cannot update the field. We will wait for the rewrite to complete. */
+ if (server.aof_state == AOF_ON && server.fsynced_reploff != -1) {
+ long long fsynced_reploff_pending;
+ atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending);
+ server.fsynced_reploff = fsynced_reploff_pending;
+ }
+
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
@@ -2030,6 +2039,7 @@ void initServerConfig(void) {
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.master_repl_offset = 0;
+ server.fsynced_reploff_pending = 0;
/* Replication partial resync backlog */
server.repl_backlog = NULL;
@@ -2508,6 +2518,7 @@ void initServer(void) {
/* Initialization after setting defaults from the config system. */
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
+ server.fsynced_reploff = server.aof_enabled ? 0 : -1;
server.hz = server.config_hz;
server.pid = getpid();
server.in_fork_child = CHILD_TYPE_NONE;
@@ -3468,6 +3479,7 @@ void call(client *c, int flags) {
/* Call the command. */
dirty = server.dirty;
+ long long old_master_repl_offset = server.master_repl_offset;
incrCommandStatsOnError(NULL, 0);
const long long call_timer = ustime();
@@ -3636,6 +3648,11 @@ void call(client *c, int flags) {
/* Do some maintenance job and cleanup */
afterCommand(c);
+ /* Remember the replication offset of the client, right after its last
+ * command that resulted in propagation. */
+ if (old_master_repl_offset != server.master_repl_offset)
+ c->woff = server.master_repl_offset;
+
/* Client pause takes effect after a transaction has finished. This needs
* to be located after everything is propagated. */
if (!server.in_exec && server.client_pause_in_transaction) {
@@ -4099,7 +4116,6 @@ int processCommand(client *c) {
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
- c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
diff --git a/src/server.h b/src/server.h
index a3d9aa088..ed39b1b96 100644
--- a/src/server.h
+++ b/src/server.h
@@ -399,6 +399,7 @@ typedef enum blocking_type {
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
BLOCKED_LIST, /* BLPOP & co. */
BLOCKED_WAIT, /* WAIT for synchronous replication. */
+ BLOCKED_WAITAOF, /* WAITAOF for AOF file fsync. */
BLOCKED_MODULE, /* Blocked by a loadable module. */
BLOCKED_STREAM, /* XREAD. */
BLOCKED_ZSET, /* BZPOP et al. */
@@ -1010,6 +1011,7 @@ typedef struct blockingState {
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
+ int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
long long reploffset; /* Replication offset to reach. */
/* BLOCKED_MODULE */
@@ -1175,6 +1177,7 @@ typedef struct client {
long long reploff; /* Applied replication offset if this is a master. */
long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
+ long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
@@ -1802,6 +1805,11 @@ struct redisServer {
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
long long master_repl_offset; /* My current replication offset */
long long second_replid_offset; /* Accept offsets up to this for replid2. */
+ redisAtomic long long fsynced_reploff_pending;/* Largest replication offset to
+ * potentially have been fsynced, applied to
+ fsynced_reploff only when AOF state is AOF_ON
+ (not during the initial rewrite) */
+ long long fsynced_reploff; /* Largest replication offset that has been confirmed to be fsynced */
int slaveseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
replBacklog *repl_backlog; /* Replication backlog for partial syncs */
@@ -1859,7 +1867,7 @@ struct redisServer {
long long master_initial_offset; /* Master PSYNC offset. */
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Synchronous replication. */
- list *clients_waiting_acks; /* Clients waiting in WAIT command. */
+ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
/* Limits */
unsigned int maxclients; /* Max number of simultaneous clients */
@@ -2789,6 +2797,7 @@ int checkGoodReplicasStatus(void);
void processClientsWaitingReplicas(void);
void unblockClientWaitingReplicas(client *c);
int replicationCountAcksByOffset(long long offset);
+int replicationCountAOFAcksByOffset(long long offset);
void replicationSendNewlineToMaster(void);
long long replicationGetSlaveOffset(void);
char *replicationGetSlaveName(client *c);
@@ -3352,6 +3361,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
void blockClientShutdown(client *c);
void blockPostponeClient(client *c);
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
+void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void signalDeletedKeyAsReady(redisDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with);
@@ -3616,6 +3626,7 @@ void bitcountCommand(client *c);
void bitposCommand(client *c);
void replconfCommand(client *c);
void waitCommand(client *c);
+void waitaofCommand(client *c);
void georadiusbymemberCommand(client *c);
void georadiusbymemberroCommand(client *c);
void georadiusCommand(client *c);
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index b2c9bdf7a..062f33a14 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -878,9 +878,9 @@ proc debug_digest {{level 0}} {
r $level debug digest
}
-proc wait_for_blocked_client {} {
+proc wait_for_blocked_client {{idx 0}} {
wait_for_condition 50 100 {
- [s blocked_clients] ne 0
+ [s $idx blocked_clients] ne 0
} else {
fail "no blocked clients"
}
diff --git a/tests/unit/wait.tcl b/tests/unit/wait.tcl
index 0ab36a0e6..7e040fbc0 100644
--- a/tests/unit/wait.tcl
+++ b/tests/unit/wait.tcl
@@ -69,3 +69,234 @@ start_server {} {
assert {[$master wait 1 1000] == 1}
}
}}
+
+
+tags {"wait aof network external:skip"} {
+ start_server {overrides {appendonly {yes} auto-aof-rewrite-percentage {0}}} {
+ set master [srv 0 client]
+
+ test {WAITAOF local copy before fsync} {
+ r config set appendfsync no
+ $master incr foo
+ assert_equal [$master waitaof 1 0 50] {0 0} ;# exits on timeout
+ r config set appendfsync everysec
+ }
+
+ test {WAITAOF local copy everysec} {
+ $master incr foo
+ assert_equal [$master waitaof 1 0 0] {1 0}
+ }
+
+ test {WAITAOF local copy with appendfsync always} {
+ r config set appendfsync always
+ $master incr foo
+ assert_equal [$master waitaof 1 0 0] {1 0}
+ r config set appendfsync everysec
+ }
+
+ test {WAITAOF local wait and then stop aof} {
+ set rd [redis_deferring_client]
+ $rd incr foo
+ $rd read
+ $rd waitaof 1 0 0
+ wait_for_blocked_client
+ r config set appendonly no ;# this should release the blocked client as an error
+ assert_error {ERR WAITAOF cannot be used when appendonly is disabled} {$rd read}
+ $rd close
+ }
+
+ test {WAITAOF local on server with aof disabled} {
+ $master incr foo
+ assert_error {ERR WAITAOF cannot be used when appendonly is disabled} {$master waitaof 1 0 0}
+ }
+
+ $master config set appendonly yes
+ waitForBgrewriteaof $master
+
+ start_server {overrides {appendonly {yes} auto-aof-rewrite-percentage {0}}} {
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+ set replica [srv 0 client]
+ set replica_host [srv 0 host]
+ set replica_port [srv 0 port]
+ set replica_pid [srv 0 pid]
+
+ # make sure the master always fsyncs first (easier to test)
+ $master config set appendfsync always
+ $replica config set appendfsync no
+
+ test {WAITAOF on demoted master gets unblocked with an error} {
+ set rd [redis_deferring_client]
+ $rd incr foo
+ $rd read
+ $rd waitaof 0 1 0
+ wait_for_blocked_client
+ $replica replicaof $master_host $master_port
+ assert_error {UNBLOCKED force unblock from blocking operation,*} {$rd read}
+ $rd close
+ }
+
+ wait_for_ofs_sync $master $replica
+
+ test {WAITAOF replica copy before fsync} {
+ $master incr foo
+ assert_equal [$master waitaof 0 1 50] {1 0} ;# exits on timeout
+ }
+ $replica config set appendfsync everysec
+
+ test {WAITAOF replica copy everysec} {
+ $master incr foo
+ assert_equal [$master waitaof 0 1 0] {1 1}
+ }
+
+ test {WAITAOF replica copy appendfsync always} {
+ $replica config set appendfsync always
+ $master incr foo
+ assert_equal [$master waitaof 0 1 0] {1 1}
+ $replica config set appendfsync everysec
+ }
+
+ test {WAITAOF replica copy if replica is blocked} {
+ exec kill -SIGSTOP $replica_pid
+ $master incr foo
+ assert_equal [$master waitaof 0 1 50] {1 0} ;# exits on timeout
+ exec kill -SIGCONT $replica_pid
+ assert_equal [$master waitaof 0 1 0] {1 1}
+ }
+
+ test {WAITAOF on promoted replica} {
+ $replica replicaof no one
+ $replica incr foo
+ assert_equal [$replica waitaof 1 0 0] {1 0}
+ }
+
+ test {WAITAOF master that loses a replica and backlog is dropped} {
+ $master config set repl-backlog-ttl 1
+ after 2000 ;# wait for backlog to expire
+ $master incr foo
+ assert_equal [$master waitaof 1 0 0] {1 0}
+ }
+
+ test {WAITAOF master without backlog, wait is released when the replica finishes full-sync} {
+ set rd [redis_deferring_client -1]
+ $rd incr foo
+ $rd read
+ $rd waitaof 0 1 0
+ wait_for_blocked_client -1
+ $replica replicaof $master_host $master_port
+ assert_equal [$rd read] {1 1}
+ $rd close
+ }
+
+ test {WAITAOF master isn't configured to do AOF} {
+ $master config set appendonly no
+ $master incr foo
+ assert_equal [$master waitaof 0 1 0] {0 1}
+ }
+
+ test {WAITAOF replica isn't configured to do AOF} {
+ $master config set appendonly yes
+ waitForBgrewriteaof $master
+ $replica config set appendonly no
+ $master incr foo
+ assert_equal [$master waitaof 1 0 0] {1 0}
+ }
+
+ test {WAITAOF both local and replica got AOF enabled at runtime} {
+ $replica config set appendonly yes
+ waitForBgrewriteaof $replica
+ $master incr foo
+ assert_equal [$master waitaof 1 1 0] {1 1}
+ }
+
+ test {WAITAOF master sends PING after last write} {
+ $master config set repl-ping-replica-period 1
+ $master incr foo
+ after 1200 ;# wait for PING
+ $master get foo
+ assert_equal [$master waitaof 1 1 0] {1 1}
+ $master config set repl-ping-replica-period 10
+ }
+
+ test {WAITAOF master client didn't send any write command} {
+ $master config set repl-ping-replica-period 1
+ set client [redis_client -1]
+ after 1200 ;# wait for PING
+ assert_equal [$master waitaof 1 1 0] {1 1}
+ $client close
+ $master config set repl-ping-replica-period 10
+ }
+
+ test {WAITAOF master client didn't send any command} {
+ $master config set repl-ping-replica-period 1
+ set client [redis [srv -1 "host"] [srv -1 "port"] 0 $::tls]
+ after 1200 ;# wait for PING
+ assert_equal [$master waitaof 1 1 0] {1 1}
+ $client close
+ $master config set repl-ping-replica-period 10
+ }
+
+ foreach fsync {no everysec always} {
+ test "WAITAOF when replica switches between masters, fsync: $fsync" {
+ # test a case where a replica is moved from one master to the other
+ # between two replication streams with different offsets that should
+ # not be mixed. done to smoke-test race conditions with bio thread.
+ start_server {overrides {appendonly {yes} auto-aof-rewrite-percentage {0}}} {
+ start_server {overrides {appendonly {yes} auto-aof-rewrite-percentage {0}}} {
+ set master2 [srv -1 client]
+ set master2_host [srv -1 host]
+ set master2_port [srv -1 port]
+ set replica2 [srv 0 client]
+ set replica2_host [srv 0 host]
+ set replica2_port [srv 0 port]
+ set replica2_pid [srv 0 pid]
+
+ $replica2 replicaof $master2_host $master2_port
+ wait_for_ofs_sync $master2 $replica2
+
+ $master config set appendfsync $fsync
+ $master2 config set appendfsync $fsync
+ $replica config set appendfsync $fsync
+ $replica2 config set appendfsync $fsync
+ if {$fsync eq "no"} {
+ after 2000 ;# wait for any previous fsync to finish
+ # can't afford "no" on the masters
+ $master config set appendfsync always
+ $master2 config set appendfsync always
+ } elseif {$fsync eq "everysec"} {
+ after 990 ;# hoping to hit a race
+ }
+
+ # add some writes and block a client on each master
+ set rd [redis_deferring_client -3]
+ set rd2 [redis_deferring_client -1]
+ $rd set boo 11
+ $rd2 set boo 22
+ $rd read
+ $rd2 read
+ $rd waitaof 1 1 0
+ $rd2 waitaof 1 1 0
+
+ if {$fsync eq "no"} {
+ # since appendfsync is disabled in the replicas, the client
+ # will get released only with full sync
+ wait_for_blocked_client -1
+ wait_for_blocked_client -3
+ }
+ # switch between the two replicas
+ $replica2 replicaof $master_host $master_port
+ $replica replicaof $master2_host $master2_port
+ assert_equal [$rd read] {1 1}
+ assert_equal [$rd2 read] {1 1}
+ $rd close
+ $rd2 close
+
+ assert_equal [$replica get boo] 22
+ assert_equal [$replica2 get boo] 11
+ }
+ }
+ }
+ }
+ }
+ }
+}