diff options
author | antirez <antirez@gmail.com> | 2020-05-27 12:39:02 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2020-05-27 12:39:02 +0200 |
commit | f135aef0dbb3ba5033210a2bdbe41196beefd025 (patch) | |
tree | 249e36d403decb295122a92053268747477f11cb | |
parent | 32378b7fad48bc6c2fc7293ad827fc8674485928 (diff) | |
download | redis-f135aef0dbb3ba5033210a2bdbe41196beefd025.tar.gz |
Revert "Keep track of meaningful replication offset in replicas too"
This reverts commit 4447ddc8bb36879db9fe49498165b360bf35ba1b.
-rw-r--r-- | src/blocked.c | 2 | ||||
-rw-r--r-- | src/networking.c | 93 | ||||
-rw-r--r-- | src/replication.c | 65 | ||||
-rw-r--r-- | src/server.h | 1 | ||||
-rw-r--r-- | tests/integration/psync2.tcl | 111 |
5 files changed, 76 insertions, 196 deletions
diff --git a/src/blocked.c b/src/blocked.c index 92f1cee65..4af79d886 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -110,7 +110,7 @@ void processUnblockedClients(void) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { if (c->querybuf && sdslen(c->querybuf) > 0) { - processInputBuffer(c); + processInputBufferAndReplicate(c); } } } diff --git a/src/networking.c b/src/networking.c index a8ea2ff94..60c3fb067 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1749,55 +1749,12 @@ int processMultibulkBuffer(client *c) { return C_ERR; } -/* Perform necessary tasks after a command was executed: - * - * 1. The client is reset unless there are reasons to avoid doing it. - * 2. In the case of master clients, the replication offset is updated. - * 3. Propagate commands we got from our master to replicas down the line. */ -void commandProcessed(client *c) { - int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand; - long long prev_offset = c->reploff; - if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { - /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; - } - - /* Don't reset the client structure for clients blocked in a - * module blocking command, so that the reply callback will - * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || - c->btype != BLOCKED_MODULE) - { - resetClient(c); - } - - /* If the client is a master we need to compute the difference - * between the applied offset before and after processing the buffer, - * to understand how much of the replication stream was actually - * applied to the master state: this quantity, and its corresponding - * part of the replication stream, will be propagated to the - * sub-replicas and to the replication backlog. */ - if (c->flags & CLIENT_MASTER) { - long long applied = c->reploff - prev_offset; - long long prev_master_repl_meaningful_offset = server.master_repl_meaningful_offset; - if (applied) { - replicationFeedSlavesFromMasterStream(server.slaves, - c->pending_querybuf, applied); - sdsrange(c->pending_querybuf,applied,-1); - } - /* The server.master_repl_meaningful_offset variable represents - * the offset of the replication stream without the pending PINGs. */ - if (cmd_is_ping) - server.master_repl_meaningful_offset = prev_master_repl_meaningful_offset; - } -} - /* This function calls processCommand(), but also performs a few sub tasks - * for the client that are useful in that context: + * that are useful in that context: * * 1. It sets the current client to the client 'c'. - * 2. calls commandProcessed() if the command was handled. + * 2. In the case of master clients, the replication offset is updated. + * 3. The client is reset unless there are reasons to avoid doing it. * * The function returns C_ERR in case the client was freed as a side effect * of processing the command, otherwise C_OK is returned. */ @@ -1805,7 +1762,20 @@ int processCommandAndResetClient(client *c) { int deadclient = 0; server.current_client = c; if (processCommand(c) == C_OK) { - commandProcessed(c); + if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { + /* Update the applied replication offset of our master. */ + c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + } + + /* Don't reset the client structure for clients blocked in a + * module blocking command, so that the reply callback will + * still be able to access the client argv and argc field. + * The client will be reset in unblockClientFromModule(). */ + if (!(c->flags & CLIENT_BLOCKED) || + c->btype != BLOCKED_MODULE) + { + resetClient(c); + } } if (server.current_client == NULL) deadclient = 1; server.current_client = NULL; @@ -1902,6 +1872,31 @@ void processInputBuffer(client *c) { } } +/* This is a wrapper for processInputBuffer that also cares about handling + * the replication forwarding to the sub-replicas, in case the client 'c' + * is flagged as master. Usually you want to call this instead of the + * raw processInputBuffer(). */ +void processInputBufferAndReplicate(client *c) { + if (!(c->flags & CLIENT_MASTER)) { + processInputBuffer(c); + } else { + /* If the client is a master we need to compute the difference + * between the applied offset before and after processing the buffer, + * to understand how much of the replication stream was actually + * applied to the master state: this quantity, and its corresponding + * part of the replication stream, will be propagated to the + * sub-replicas and to the replication backlog. */ + size_t prev_offset = c->reploff; + processInputBuffer(c); + size_t applied = c->reploff - prev_offset; + if (applied) { + replicationFeedSlavesFromMasterStream(server.slaves, + c->pending_querybuf, applied); + sdsrange(c->pending_querybuf,applied,-1); + } + } +} + void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, readlen; @@ -1969,7 +1964,7 @@ void readQueryFromClient(connection *conn) { /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ - processInputBuffer(c); + processInputBufferAndReplicate(c); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -3198,7 +3193,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { continue; } } - processInputBuffer(c); + processInputBufferAndReplicate(c); } return processed; } diff --git a/src/replication.c b/src/replication.c index 99f233380..12500c0af 100644 --- a/src/replication.c +++ b/src/replication.c @@ -39,7 +39,6 @@ #include <sys/socket.h> #include <sys/stat.h> -long long adjustMeaningfulReplOffset(); void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(connection *conn); void replicationSendAck(void); @@ -2755,10 +2754,6 @@ void replicationCacheMaster(client *c) { * pending outputs to the master. */ sdsclear(server.master->querybuf); sdsclear(server.master->pending_querybuf); - - /* Adjust reploff and read_reploff to the last meaningful offset we - * executed. This is the offset the replica will use for future PSYNC. */ - server.master->reploff = adjustMeaningfulReplOffset(); server.master->read_reploff = server.master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); @@ -2783,15 +2778,33 @@ void replicationCacheMaster(client *c) { replicationHandleMasterDisconnection(); } -/* If the "meaningful" offset, that is the offset without the final PINGs - * in the stream, is different than the last offset, use it instead: - * often when the master is no longer reachable, replicas will never - * receive the PINGs, however the master will end with an incremented - * offset because of the PINGs and will not be able to incrementally - * PSYNC with the new master. - * This function trims the replication backlog when needed, and returns - * the offset to be used for future partial sync. */ -long long adjustMeaningfulReplOffset() { +/* This function is called when a master is turend into a slave, in order to + * create from scratch a cached master for the new client, that will allow + * to PSYNC with the slave that was promoted as the new master after a + * failover. + * + * Assuming this instance was previously the master instance of the new master, + * the new master will accept its replication ID, and potentiall also the + * current offset if no data was lost during the failover. So we use our + * current replication ID and offset in order to synthesize a cached master. */ +void replicationCacheMasterUsingMyself(void) { + serverLog(LL_NOTICE, + "Before turning into a replica, using my own master parameters " + "to synthesize a cached master: I may be able to synchronize with " + "the new master with just a partial transfer."); + + /* This will be used to populate the field server.master->reploff + * by replicationCreateMasterClient(). We'll later set the created + * master as server.cached_master, so the replica will use such + * offset for PSYNC. */ + server.master_initial_offset = server.master_repl_offset; + + /* However if the "meaningful" offset, that is the offset without + * the final PINGs in the stream, is different, use this instead: + * often when the master is no longer reachable, replicas will never + * receive the PINGs, however the master will end with an incremented + * offset because of the PINGs and will not be able to incrementally + * PSYNC with the new master. */ if (server.master_repl_offset > server.master_repl_meaningful_offset) { long long delta = server.master_repl_offset - server.master_repl_meaningful_offset; @@ -2801,6 +2814,7 @@ long long adjustMeaningfulReplOffset() { server.master_repl_meaningful_offset, server.master_repl_offset, delta); + server.master_initial_offset = server.master_repl_meaningful_offset; server.master_repl_offset = server.master_repl_meaningful_offset; if (server.repl_backlog_histlen <= delta) { server.repl_backlog_histlen = 0; @@ -2812,29 +2826,6 @@ long long adjustMeaningfulReplOffset() { server.repl_backlog_size; } } - return server.master_repl_offset; -} - -/* This function is called when a master is turend into a slave, in order to - * create from scratch a cached master for the new client, that will allow - * to PSYNC with the slave that was promoted as the new master after a - * failover. - * - * Assuming this instance was previously the master instance of the new master, - * the new master will accept its replication ID, and potentiall also the - * current offset if no data was lost during the failover. So we use our - * current replication ID and offset in order to synthesize a cached master. */ -void replicationCacheMasterUsingMyself(void) { - serverLog(LL_NOTICE, - "Before turning into a replica, using my own master parameters " - "to synthesize a cached master: I may be able to synchronize with " - "the new master with just a partial transfer."); - - /* This will be used to populate the field server.master->reploff - * by replicationCreateMasterClient(). We'll later set the created - * master as server.cached_master, so the replica will use such - * offset for PSYNC. */ - server.master_initial_offset = adjustMeaningfulReplOffset(); /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ diff --git a/src/server.h b/src/server.h index f835bf5e9..4b4fd4e0d 100644 --- a/src/server.h +++ b/src/server.h @@ -1608,6 +1608,7 @@ void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); void processInputBuffer(client *c); +void processInputBufferAndReplicate(client *c); void processGopherRequest(client *c); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl index 29a880f99..40ca55607 100644 --- a/tests/integration/psync2.tcl +++ b/tests/integration/psync2.tcl @@ -117,7 +117,6 @@ start_server {} { set used [list $master_id] test "PSYNC2: \[NEW LAYOUT\] Set #$master_id as master" { $R($master_id) slaveof no one - $R($master_id) config set repl-ping-replica-period 1 ;# increse the chance that random ping will cause issues if {$counter_value == 0} { $R($master_id) set x $counter_value } @@ -259,14 +258,9 @@ start_server {} { $R($j) slaveof $master_host $master_port } - # Wait for replicas to sync. it is not enough to just wait for connected_slaves==4 - # since we might do the check before the master realized that they're disconnected + # Wait for slaves to sync wait_for_condition 50 1000 { - [status $R($master_id) connected_slaves] == 4 && - [status $R([expr {($master_id+1)%5}]) master_link_status] == "up" && - [status $R([expr {($master_id+2)%5}]) master_link_status] == "up" && - [status $R([expr {($master_id+3)%5}]) master_link_status] == "up" && - [status $R([expr {($master_id+4)%5}]) master_link_status] == "up" + [status $R($master_id) connected_slaves] == 4 } else { show_cluster_status fail "Replica not reconnecting" @@ -278,7 +272,6 @@ start_server {} { set slave_id [expr {($master_id+1)%5}] set sync_count [status $R($master_id) sync_full] set sync_partial [status $R($master_id) sync_partial_ok] - set sync_partial_err [status $R($master_id) sync_partial_err] catch { $R($slave_id) config rewrite $R($slave_id) debug restart @@ -370,103 +363,3 @@ start_server {} { } }}}}} - -start_server {tags {"psync2"}} { -start_server {} { -start_server {} { -start_server {} { -start_server {} { - test {pings at the end of replication stream are ignored for psync} { - set master [srv -4 client] - set master_host [srv -4 host] - set master_port [srv -4 port] - set replica1 [srv -3 client] - set replica2 [srv -2 client] - set replica3 [srv -1 client] - set replica4 [srv -0 client] - - $replica1 replicaof $master_host $master_port - $replica2 replicaof $master_host $master_port - $replica3 replicaof $master_host $master_port - $replica4 replicaof $master_host $master_port - wait_for_condition 50 1000 { - [status $master connected_slaves] == 4 - } else { - fail "replicas didn't connect" - } - - $master incr x - wait_for_condition 50 1000 { - [$replica1 get x] == 1 && [$replica2 get x] == 1 && - [$replica3 get x] == 1 && [$replica4 get x] == 1 - } else { - fail "replicas didn't get incr" - } - - # disconnect replica1 and replica2 - # and wait for the master to send a ping to replica3 and replica4 - $replica1 replicaof no one - $replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id - $master config set repl-ping-replica-period 1 - after 1500 - - # make everyone sync from the replica1 that didn't get the last ping from the old master - # replica4 will keep syncing from the old master which now syncs from replica1 - # and replica2 will re-connect to the old master (which went back in time) - set new_master_host [srv -3 host] - set new_master_port [srv -3 port] - $replica3 replicaof $new_master_host $new_master_port - $master replicaof $new_master_host $new_master_port - $replica2 replicaof $master_host $master_port - wait_for_condition 50 1000 { - [status $replica2 master_link_status] == "up" && - [status $replica3 master_link_status] == "up" && - [status $replica4 master_link_status] == "up" && - [status $master master_link_status] == "up" - } else { - fail "replicas didn't connect" - } - - # make sure replication is still alive and kicking - $replica1 incr x - wait_for_condition 50 1000 { - [$replica2 get x] == 2 && - [$replica3 get x] == 2 && - [$replica4 get x] == 2 && - [$master get x] == 2 - } else { - fail "replicas didn't get incr" - } - - # make sure there are full syncs other than the initial ones - assert_equal [status $master sync_full] 4 - assert_equal [status $replica1 sync_full] 0 - assert_equal [status $replica2 sync_full] 0 - assert_equal [status $replica3 sync_full] 0 - assert_equal [status $replica4 sync_full] 0 - - # force psync - $master client kill type master - $replica2 client kill type master - $replica3 client kill type master - $replica4 client kill type master - - # make sure replication is still alive and kicking - $replica1 incr x - wait_for_condition 50 1000 { - [$replica2 get x] == 3 && - [$replica3 get x] == 3 && - [$replica4 get x] == 3 && - [$master get x] == 3 - } else { - fail "replicas didn't get incr" - } - - # make sure there are full syncs other than the initial ones - assert_equal [status $master sync_full] 4 - assert_equal [status $replica1 sync_full] 0 - assert_equal [status $replica2 sync_full] 0 - assert_equal [status $replica3 sync_full] 0 - assert_equal [status $replica4 sync_full] 0 -} -}}}}} |