summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
authorSlava Koyfman <slavak@users.noreply.github.com>2023-03-14 20:26:21 +0200
committerGitHub <noreply@github.com>2023-03-14 20:26:21 +0200
commit9344f654c6227f30adf8bafd49b4218357c438b7 (patch)
tree89dd758fad385c62ccb166d7f8971eb93589a215 /src/blocked.c
parent7997874f4d2be9da1e26c77804569b0057c1e0a2 (diff)
downloadredis-9344f654c6227f30adf8bafd49b4218357c438b7.tar.gz
Implementing the WAITAOF command (issue #10505) (#11713)
Implementing the WAITAOF functionality which would allow the user to block until a specified number of Redises have fsynced all previous write commands to the AOF. Syntax: `WAITAOF <num_local> <num_replicas> <timeout>` Response: Array containing two elements: num_local, num_replicas num_local is always either 0 or 1 representing the local AOF on the master. num_replicas is the number of replicas that acknowledged the a replication offset of the last write being fsynced to the AOF. Returns an error when called on replicas, or when called with non-zero num_local on a master with AOF disabled, in all other cases the response just contains number of fsync copies. Main changes: * Added code to keep track of replication offsets that are confirmed to have been fsynced to disk. * Keep advancing master_repl_offset even when replication is disabled (and there's no replication backlog, only if there's an AOF enabled). This way we can use this command and it's mechanisms even when replication is disabled. * Extend REPLCONF ACK to `REPLCONF ACK <ofs> FACK <ofs>`, the FACK will be appended only if there's an AOF on the replica, and already ignored on old masters (thus backwards compatible) * WAIT now no longer wait for the replication offset after your last command, but rather the replication offset after your last write (or read command that caused propagation, e.g. lazy expiry). Unrelated changes: * WAIT command respects CLIENT_DENY_BLOCKING (not just CLIENT_MULTI) Implementation details: * Add an atomic var named `fsynced_reploff_pending` that's updated (usually by the bio thread) and later copied to the main `fsynced_reploff` variable (only if the AOF base file exists). I.e. during the initial AOF rewrite it will not be used as the fsynced offset since the AOF base is still missing. * Replace close+fsync bio job with new BIO_CLOSE_AOF (AOF specific) job that will also update fsync offset the field. * Handle all AOF jobs (BIO_CLOSE_AOF, BIO_AOF_FSYNC) in the same bio worker thread, to impose ordering on their execution. This solves a race condition where a job could set `fsynced_reploff_pending` to a higher value than another pending fsync job, resulting in indicating an offset for which parts of the data have not yet actually been fsynced. Imposing an ordering on the jobs guarantees that fsync jobs are executed in increasing order of replication offset. * Drain bio jobs when switching `appendfsync` to "always" This should prevent a write race between updates to `fsynced_reploff_pending` in the main thread (`flushAppendOnlyFile` when set to ALWAYS fsync), and those done in the bio thread. * Drain the pending fsync when starting over a new AOF to avoid race conditions with the previous AOF offsets overriding the new one (e.g. after switching to replicate from a new master). * Make sure to update the fsynced offset at the end of the initial AOF rewrite. a must in case there are no additional writes that trigger a periodic fsync, specifically for a replica that does a full sync. Limitations: It is possible to write a module and a Lua script that propagate to the AOF and doesn't propagate to the replication stream. see REDISMODULE_ARGV_NO_REPLICAS and luaRedisSetReplCommand. These features are incompatible with the WAITAOF command, and can result in two bad cases. The scenario is that the user executes command that only propagates to AOF, and then immediately issues a WAITAOF, and there's no further writes on the replication stream after that. 1. if the the last thing that happened on the replication stream is a PING (which increased the replication offset but won't trigger an fsync on the replica), then the client would hang forever (will wait for an fack that the replica will never send sine it doesn't trigger any fsyncs). 2. if the last thing that happened is a write command that got propagated properly, then WAITAOF will be released immediately, without waiting for an fsync (since the offset didn't change) Refactoring: * Plumbing to allow bio worker to handle multiple job types This introduces infrastructure necessary to allow BIO workers to not have a 1-1 mapping of worker to job-type. This allows in the future to assign multiple job types to a single worker, either as a performance/resource optimization, or as a way of enforcing ordering between specific classes of jobs. Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c16
1 files changed, 15 insertions, 1 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 753442c2b..1f5fba01b 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -177,7 +177,7 @@ void unblockClient(client *c) {
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
- } else if (c->bstate.btype == BLOCKED_WAIT) {
+ } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
@@ -225,6 +225,10 @@ void replyToBlockedClientTimedOut(client *c) {
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset));
+ } else if (c->bstate.btype == BLOCKED_WAITAOF) {
+ addReplyArrayLen(c,2);
+ addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset);
+ addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
@@ -583,6 +587,16 @@ void blockForReplication(client *c, mstime_t timeout, long long offset, long num
blockClient(c,BLOCKED_WAIT);
}
+/* block a client due to waitaof command */
+void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
+ c->bstate.timeout = timeout;
+ c->bstate.reploffset = offset;
+ c->bstate.numreplicas = numreplicas;
+ c->bstate.numlocal = numlocal;
+ listAddNodeHead(server.clients_waiting_acks,c);
+ blockClient(c,BLOCKED_WAITAOF);
+}
+
/* Postpone client from executing a command. For example the server might be busy
* requesting to avoid processing clients commands which will be processed later
* when the it is ready to accept them. */