summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2020-04-23 15:04:42 +0300
committerantirez <antirez@gmail.com>2020-04-27 15:52:49 +0200
commite4d2bb62b2922e549ae4c86f514a4c63aa7323e8 (patch)
tree9cdbc4e130ae3d0645273c2b9ca5795f1c528cc5
parentfea9788cc4ef02dee8b41a21d1f8ae56d79a86cb (diff)
downloadredis-e4d2bb62b2922e549ae4c86f514a4c63aa7323e8.tar.gz
Keep track of meaningful replication offset in replicas too
Now both master and replicas keep track of the last replication offset that contains meaningful data (ignoring the tailing pings), and both trim that tail from the replication backlog, and the offset with which they try to use for psync. the implication is that if someone missed some pings, or even have excessive pings that the promoted replica has, it'll still be able to psync (avoid full sync). the downside (which was already committed) is that replicas running old code may fail to psync, since the promoted replica trims pings form it's backlog. This commit adds a test that reproduces several cases of promotions and demotions with stale and non-stale pings Background: The mearningful offset on the master was added recently to solve a problem were the master is left all alone, injecting PINGs into it's backlog when no one is listening and then gets demoted and tries to replicate from a replica that didn't have any of the PINGs (or at least not the last ones). however, consider this case: master A has two replicas (B and C) replicating directly from it. there's no traffic at all, and also no network issues, just many pings in the tail of the backlog. now B gets promoted, A becomes a replica of B, and C remains a replica of A. when A gets demoted, it trims the pings from its backlog, and successfully replicate from B. however, C is still aware of these PINGs, when it'll disconnect and re-connect to A, it'll ask for something that's not in the backlog anymore (since A trimmed the tail of it's backlog), and be forced to do a full sync (something it didn't have to do before the meaningful offset fix). Besides that, the psync2 test was always failing randomly here and there, it turns out the reason were PINGs. Investigating it shows the following scenario: cycle 1: redis #1 is master, and all the rest are direct replicas of #1 cycle 2: redis #2 is promoted to master, #1 is a replica of #2 and #3 is replica of #1 now we see that when #1 is demoted it prints: 17339:S 21 Apr 2020 11:16:38.523 * Using the meaningful offset 3929963 instead of 3929977 to exclude the final PINGs (14 bytes difference) 17339:S 21 Apr 2020 11:16:39.391 * Trying a partial resynchronization (request e2b3f8817735fdfe5fa4626766daa938b61419e5:3929964). 17339:S 21 Apr 2020 11:16:39.392 * Successful partial resynchronization with master. and when #3 connects to the demoted #2, #2 says: 17339:S 21 Apr 2020 11:16:40.084 * Partial resynchronization not accepted: Requested offset for secondary ID was 3929978, but I can reply up to 3929964 so the issue here is that the meaningful offset feature saved the day for the demoted master (since it needs to sync from a replica that didn't get the last ping), but it didn't help one of the other replicas which did get the last ping.
-rw-r--r--src/blocked.c2
-rw-r--r--src/networking.c93
-rw-r--r--src/replication.c64
-rw-r--r--src/server.h1
-rw-r--r--tests/integration/psync2.tcl144
5 files changed, 212 insertions, 92 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 00cc798d5..045369e93 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -110,7 +110,7 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) {
- processInputBufferAndReplicate(c);
+ processInputBuffer(c);
}
}
}
diff --git a/src/networking.c b/src/networking.c
index 744979d16..e4d40fdf0 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1671,12 +1671,55 @@ int processMultibulkBuffer(client *c) {
return C_ERR;
}
+/* Perform necessary tasks after a command was executed:
+ *
+ * 1. The client is reset unless there are reasons to avoid doing it.
+ * 2. In the case of master clients, the replication offset is updated.
+ * 3. Propagate commands we got from our master to replicas down the line. */
+void commandProcessed(client *c) {
+ int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand;
+ long long prev_offset = c->reploff;
+ if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
+ /* Update the applied replication offset of our master. */
+ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
+ }
+
+ /* Don't reset the client structure for clients blocked in a
+ * module blocking command, so that the reply callback will
+ * still be able to access the client argv and argc field.
+ * The client will be reset in unblockClientFromModule(). */
+ if (!(c->flags & CLIENT_BLOCKED) ||
+ c->btype != BLOCKED_MODULE)
+ {
+ resetClient(c);
+ }
+
+ /* If the client is a master we need to compute the difference
+ * between the applied offset before and after processing the buffer,
+ * to understand how much of the replication stream was actually
+ * applied to the master state: this quantity, and its corresponding
+ * part of the replication stream, will be propagated to the
+ * sub-replicas and to the replication backlog. */
+ if (c->flags & CLIENT_MASTER) {
+ long long applied = c->reploff - prev_offset;
+ long long prev_master_repl_meaningful_offset = server.master_repl_meaningful_offset;
+ if (applied) {
+ replicationFeedSlavesFromMasterStream(server.slaves,
+ c->pending_querybuf, applied);
+ sdsrange(c->pending_querybuf,applied,-1);
+ }
+ /* The server.master_repl_meaningful_offset variable represents
+ * the offset of the replication stream without the pending PINGs. */
+ if (cmd_is_ping)
+ server.master_repl_meaningful_offset = prev_master_repl_meaningful_offset;
+ }
+}
+
/* This function calls processCommand(), but also performs a few sub tasks
- * that are useful in that context:
+ * for the client that are useful in that context:
*
* 1. It sets the current client to the client 'c'.
- * 2. In the case of master clients, the replication offset is updated.
- * 3. The client is reset unless there are reasons to avoid doing it.
+ * 2. calls commandProcessed() if the command was handled.
*
* The function returns C_ERR in case the client was freed as a side effect
* of processing the command, otherwise C_OK is returned. */
@@ -1684,20 +1727,7 @@ int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
if (processCommand(c) == C_OK) {
- if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
- /* Update the applied replication offset of our master. */
- c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
- }
-
- /* Don't reset the client structure for clients blocked in a
- * module blocking command, so that the reply callback will
- * still be able to access the client argv and argc field.
- * The client will be reset in unblockClientFromModule(). */
- if (!(c->flags & CLIENT_BLOCKED) ||
- c->btype != BLOCKED_MODULE)
- {
- resetClient(c);
- }
+ commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
@@ -1794,31 +1824,6 @@ void processInputBuffer(client *c) {
}
}
-/* This is a wrapper for processInputBuffer that also cares about handling
- * the replication forwarding to the sub-replicas, in case the client 'c'
- * is flagged as master. Usually you want to call this instead of the
- * raw processInputBuffer(). */
-void processInputBufferAndReplicate(client *c) {
- if (!(c->flags & CLIENT_MASTER)) {
- processInputBuffer(c);
- } else {
- /* If the client is a master we need to compute the difference
- * between the applied offset before and after processing the buffer,
- * to understand how much of the replication stream was actually
- * applied to the master state: this quantity, and its corresponding
- * part of the replication stream, will be propagated to the
- * sub-replicas and to the replication backlog. */
- size_t prev_offset = c->reploff;
- processInputBuffer(c);
- size_t applied = c->reploff - prev_offset;
- if (applied) {
- replicationFeedSlavesFromMasterStream(server.slaves,
- c->pending_querybuf, applied);
- sdsrange(c->pending_querybuf,applied,-1);
- }
- }
-}
-
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
@@ -1886,7 +1891,7 @@ void readQueryFromClient(connection *conn) {
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
- processInputBufferAndReplicate(c);
+ processInputBuffer(c);
}
void getClientsMaxBuffers(unsigned long *longest_output_list,
@@ -3101,7 +3106,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
continue;
}
}
- processInputBufferAndReplicate(c);
+ processInputBuffer(c);
}
return processed;
}
diff --git a/src/replication.c b/src/replication.c
index 3e9910374..c59639cd1 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -39,6 +39,7 @@
#include <sys/socket.h>
#include <sys/stat.h>
+long long adjustMeaningfulReplOffset();
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(connection *conn);
void replicationSendAck(void);
@@ -2693,6 +2694,9 @@ void replicationCacheMaster(client *c) {
* pending outputs to the master. */
sdsclear(server.master->querybuf);
sdsclear(server.master->pending_querybuf);
+ /* Adjust reploff and read_reploff to the last meaningful offset we executed.
+ * this is the offset the replica will use for future PSYNC. */
+ server.master->reploff = adjustMeaningfulReplOffset();
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
@@ -2717,33 +2721,15 @@ void replicationCacheMaster(client *c) {
replicationHandleMasterDisconnection();
}
-/* This function is called when a master is turend into a slave, in order to
- * create from scratch a cached master for the new client, that will allow
- * to PSYNC with the slave that was promoted as the new master after a
- * failover.
- *
- * Assuming this instance was previously the master instance of the new master,
- * the new master will accept its replication ID, and potentiall also the
- * current offset if no data was lost during the failover. So we use our
- * current replication ID and offset in order to synthesize a cached master. */
-void replicationCacheMasterUsingMyself(void) {
- serverLog(LL_NOTICE,
- "Before turning into a replica, using my own master parameters "
- "to synthesize a cached master: I may be able to synchronize with "
- "the new master with just a partial transfer.");
-
- /* This will be used to populate the field server.master->reploff
- * by replicationCreateMasterClient(). We'll later set the created
- * master as server.cached_master, so the replica will use such
- * offset for PSYNC. */
- server.master_initial_offset = server.master_repl_offset;
-
- /* However if the "meaningful" offset, that is the offset without
- * the final PINGs in the stream, is different, use this instead:
- * often when the master is no longer reachable, replicas will never
- * receive the PINGs, however the master will end with an incremented
- * offset because of the PINGs and will not be able to incrementally
- * PSYNC with the new master. */
+/* If the "meaningful" offset, that is the offset without the final PINGs
+ * in the stream, is different than the last offset, use it instead:
+ * often when the master is no longer reachable, replicas will never
+ * receive the PINGs, however the master will end with an incremented
+ * offset because of the PINGs and will not be able to incrementally
+ * PSYNC with the new master.
+ * This function trims the replication backlog when needed, and returns
+ * the offset to be used for future partial sync. */
+long long adjustMeaningfulReplOffset() {
if (server.master_repl_offset > server.master_repl_meaningful_offset) {
long long delta = server.master_repl_offset -
server.master_repl_meaningful_offset;
@@ -2753,7 +2739,6 @@ void replicationCacheMasterUsingMyself(void) {
server.master_repl_meaningful_offset,
server.master_repl_offset,
delta);
- server.master_initial_offset = server.master_repl_meaningful_offset;
server.master_repl_offset = server.master_repl_meaningful_offset;
if (server.repl_backlog_histlen <= delta) {
server.repl_backlog_histlen = 0;
@@ -2765,6 +2750,29 @@ void replicationCacheMasterUsingMyself(void) {
server.repl_backlog_size;
}
}
+ return server.master_repl_offset;
+}
+
+/* This function is called when a master is turend into a slave, in order to
+ * create from scratch a cached master for the new client, that will allow
+ * to PSYNC with the slave that was promoted as the new master after a
+ * failover.
+ *
+ * Assuming this instance was previously the master instance of the new master,
+ * the new master will accept its replication ID, and potentiall also the
+ * current offset if no data was lost during the failover. So we use our
+ * current replication ID and offset in order to synthesize a cached master. */
+void replicationCacheMasterUsingMyself(void) {
+ serverLog(LL_NOTICE,
+ "Before turning into a replica, using my own master parameters "
+ "to synthesize a cached master: I may be able to synchronize with "
+ "the new master with just a partial transfer.");
+
+ /* This will be used to populate the field server.master->reploff
+ * by replicationCreateMasterClient(). We'll later set the created
+ * master as server.cached_master, so the replica will use such
+ * offset for PSYNC. */
+ server.master_initial_offset = adjustMeaningfulReplOffset();
/* The master client we create can be set to any DBID, because
* the new master will start its replication stream with SELECT. */
diff --git a/src/server.h b/src/server.h
index 9e1e506af..41d767e13 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1600,7 +1600,6 @@ void setDeferredSetLen(client *c, void *node, long length);
void setDeferredAttributeLen(client *c, void *node, long length);
void setDeferredPushLen(client *c, void *node, long length);
void processInputBuffer(client *c);
-void processInputBufferAndReplicate(client *c);
void processGopherRequest(client *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl
index 333736ffa..4e1189e0b 100644
--- a/tests/integration/psync2.tcl
+++ b/tests/integration/psync2.tcl
@@ -44,6 +44,7 @@ start_server {} {
set used [list $master_id]
test "PSYNC2: \[NEW LAYOUT\] Set #$master_id as master" {
$R($master_id) slaveof no one
+ $R($master_id) config set repl-ping-replica-period 1 ;# increse the chance that random ping will cause issues
if {$counter_value == 0} {
$R($master_id) set x $counter_value
}
@@ -114,23 +115,20 @@ start_server {} {
}
}
- # wait for all the slaves to be in sync with the master
- set master_ofs [status $R($master_id) master_repl_offset]
+ # wait for all the slaves to be in sync with the master, due to pings, we have to re-sample the master constantly too
wait_for_condition 500 100 {
- $master_ofs == [status $R(0) master_repl_offset] &&
- $master_ofs == [status $R(1) master_repl_offset] &&
- $master_ofs == [status $R(2) master_repl_offset] &&
- $master_ofs == [status $R(3) master_repl_offset] &&
- $master_ofs == [status $R(4) master_repl_offset]
+ [status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] &&
+ [status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] &&
+ [status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] &&
+ [status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] &&
+ [status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset]
} else {
- if {$debug_msg} {
- for {set j 0} {$j < 5} {incr j} {
- puts "$j: sync_full: [status $R($j) sync_full]"
- puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
- puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
- puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
- puts "---"
- }
+ for {set j 0} {$j < 5} {incr j} {
+ puts "$j: sync_full: [status $R($j) sync_full]"
+ puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
+ puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
+ puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
+ puts "---"
}
fail "Slaves are not in sync with the master after too long time."
}
@@ -175,9 +173,14 @@ start_server {} {
$R($j) slaveof $master_host $master_port
}
- # Wait for slaves to sync
+ # Wait for replicas to sync. it is not enough to just wait for connected_slaves==4
+ # since we might do the check before the master realized that they're disconnected
wait_for_condition 50 1000 {
- [status $R($master_id) connected_slaves] == 4
+ [status $R($master_id) connected_slaves] == 4 &&
+ [status $R([expr {($master_id+1)%5}]) master_link_status] == "up" &&
+ [status $R([expr {($master_id+2)%5}]) master_link_status] == "up" &&
+ [status $R([expr {($master_id+3)%5}]) master_link_status] == "up" &&
+ [status $R([expr {($master_id+4)%5}]) master_link_status] == "up"
} else {
fail "Replica not reconnecting"
}
@@ -188,6 +191,7 @@ start_server {} {
set slave_id [expr {($master_id+1)%5}]
set sync_count [status $R($master_id) sync_full]
set sync_partial [status $R($master_id) sync_partial_ok]
+ set sync_partial_err [status $R($master_id) sync_partial_err]
catch {
$R($slave_id) config rewrite
$R($slave_id) debug restart
@@ -197,7 +201,11 @@ start_server {} {
wait_for_condition 50 1000 {
[status $R($master_id) sync_partial_ok] == $sync_partial + 1
} else {
- fail "Replica not reconnecting"
+ puts "prev sync_full: $sync_count"
+ puts "prev sync_partial_ok: $sync_partial"
+ puts "prev sync_partial_err: $sync_partial_err"
+ puts [$R($master_id) info stats]
+ fail "Replica didn't partial sync"
}
set new_sync_count [status $R($master_id) sync_full]
assert {$sync_count == $new_sync_count}
@@ -271,3 +279,103 @@ start_server {} {
}
}}}}}
+
+start_server {tags {"psync2"}} {
+start_server {} {
+start_server {} {
+start_server {} {
+start_server {} {
+ test {pings at the end of replication stream are ignored for psync} {
+ set master [srv -4 client]
+ set master_host [srv -4 host]
+ set master_port [srv -4 port]
+ set replica1 [srv -3 client]
+ set replica2 [srv -2 client]
+ set replica3 [srv -1 client]
+ set replica4 [srv -0 client]
+
+ $replica1 replicaof $master_host $master_port
+ $replica2 replicaof $master_host $master_port
+ $replica3 replicaof $master_host $master_port
+ $replica4 replicaof $master_host $master_port
+ wait_for_condition 50 1000 {
+ [status $master connected_slaves] == 4
+ } else {
+ fail "replicas didn't connect"
+ }
+
+ $master incr x
+ wait_for_condition 50 1000 {
+ [$replica1 get x] == 1 && [$replica2 get x] == 1 &&
+ [$replica3 get x] == 1 && [$replica4 get x] == 1
+ } else {
+ fail "replicas didn't get incr"
+ }
+
+ # disconnect replica1 and replica2
+ # and wait for the master to send a ping to replica3 and replica4
+ $replica1 replicaof no one
+ $replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id
+ $master config set repl-ping-replica-period 1
+ after 1500
+
+ # make everyone sync from the replica1 that didn't get the last ping from the old master
+ # replica4 will keep syncing from the old master which now syncs from replica1
+ # and replica2 will re-connect to the old master (which went back in time)
+ set new_master_host [srv -3 host]
+ set new_master_port [srv -3 port]
+ $replica3 replicaof $new_master_host $new_master_port
+ $master replicaof $new_master_host $new_master_port
+ $replica2 replicaof $master_host $master_port
+ wait_for_condition 50 1000 {
+ [status $replica2 master_link_status] == "up" &&
+ [status $replica3 master_link_status] == "up" &&
+ [status $replica4 master_link_status] == "up" &&
+ [status $master master_link_status] == "up"
+ } else {
+ fail "replicas didn't connect"
+ }
+
+ # make sure replication is still alive and kicking
+ $replica1 incr x
+ wait_for_condition 50 1000 {
+ [$replica2 get x] == 2 &&
+ [$replica3 get x] == 2 &&
+ [$replica4 get x] == 2 &&
+ [$master get x] == 2
+ } else {
+ fail "replicas didn't get incr"
+ }
+
+ # make sure there are full syncs other than the initial ones
+ assert_equal [status $master sync_full] 4
+ assert_equal [status $replica1 sync_full] 0
+ assert_equal [status $replica2 sync_full] 0
+ assert_equal [status $replica3 sync_full] 0
+ assert_equal [status $replica4 sync_full] 0
+
+ # force psync
+ $master client kill type master
+ $replica2 client kill type master
+ $replica3 client kill type master
+ $replica4 client kill type master
+
+ # make sure replication is still alive and kicking
+ $replica1 incr x
+ wait_for_condition 50 1000 {
+ [$replica2 get x] == 3 &&
+ [$replica3 get x] == 3 &&
+ [$replica4 get x] == 3 &&
+ [$master get x] == 3
+ } else {
+ fail "replicas didn't get incr"
+ }
+
+ # make sure there are full syncs other than the initial ones
+ assert_equal [status $master sync_full] 4
+ assert_equal [status $replica1 sync_full] 0
+ assert_equal [status $replica2 sync_full] 0
+ assert_equal [status $replica3 sync_full] 0
+ assert_equal [status $replica4 sync_full] 0
+}
+}}}}}