summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-05-27 12:39:02 +0200
committerantirez <antirez@gmail.com>2020-05-27 12:39:02 +0200
commitf135aef0dbb3ba5033210a2bdbe41196beefd025 (patch)
tree249e36d403decb295122a92053268747477f11cb
parent32378b7fad48bc6c2fc7293ad827fc8674485928 (diff)
downloadredis-f135aef0dbb3ba5033210a2bdbe41196beefd025.tar.gz
Revert "Keep track of meaningful replication offset in replicas too"
This reverts commit 4447ddc8bb36879db9fe49498165b360bf35ba1b.
-rw-r--r--src/blocked.c2
-rw-r--r--src/networking.c93
-rw-r--r--src/replication.c65
-rw-r--r--src/server.h1
-rw-r--r--tests/integration/psync2.tcl111
5 files changed, 76 insertions, 196 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 92f1cee65..4af79d886 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -110,7 +110,7 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) {
- processInputBuffer(c);
+ processInputBufferAndReplicate(c);
}
}
}
diff --git a/src/networking.c b/src/networking.c
index a8ea2ff94..60c3fb067 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1749,55 +1749,12 @@ int processMultibulkBuffer(client *c) {
return C_ERR;
}
-/* Perform necessary tasks after a command was executed:
- *
- * 1. The client is reset unless there are reasons to avoid doing it.
- * 2. In the case of master clients, the replication offset is updated.
- * 3. Propagate commands we got from our master to replicas down the line. */
-void commandProcessed(client *c) {
- int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand;
- long long prev_offset = c->reploff;
- if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
- /* Update the applied replication offset of our master. */
- c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
- }
-
- /* Don't reset the client structure for clients blocked in a
- * module blocking command, so that the reply callback will
- * still be able to access the client argv and argc field.
- * The client will be reset in unblockClientFromModule(). */
- if (!(c->flags & CLIENT_BLOCKED) ||
- c->btype != BLOCKED_MODULE)
- {
- resetClient(c);
- }
-
- /* If the client is a master we need to compute the difference
- * between the applied offset before and after processing the buffer,
- * to understand how much of the replication stream was actually
- * applied to the master state: this quantity, and its corresponding
- * part of the replication stream, will be propagated to the
- * sub-replicas and to the replication backlog. */
- if (c->flags & CLIENT_MASTER) {
- long long applied = c->reploff - prev_offset;
- long long prev_master_repl_meaningful_offset = server.master_repl_meaningful_offset;
- if (applied) {
- replicationFeedSlavesFromMasterStream(server.slaves,
- c->pending_querybuf, applied);
- sdsrange(c->pending_querybuf,applied,-1);
- }
- /* The server.master_repl_meaningful_offset variable represents
- * the offset of the replication stream without the pending PINGs. */
- if (cmd_is_ping)
- server.master_repl_meaningful_offset = prev_master_repl_meaningful_offset;
- }
-}
-
/* This function calls processCommand(), but also performs a few sub tasks
- * for the client that are useful in that context:
+ * that are useful in that context:
*
* 1. It sets the current client to the client 'c'.
- * 2. calls commandProcessed() if the command was handled.
+ * 2. In the case of master clients, the replication offset is updated.
+ * 3. The client is reset unless there are reasons to avoid doing it.
*
* The function returns C_ERR in case the client was freed as a side effect
* of processing the command, otherwise C_OK is returned. */
@@ -1805,7 +1762,20 @@ int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
if (processCommand(c) == C_OK) {
- commandProcessed(c);
+ if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
+ /* Update the applied replication offset of our master. */
+ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
+ }
+
+ /* Don't reset the client structure for clients blocked in a
+ * module blocking command, so that the reply callback will
+ * still be able to access the client argv and argc field.
+ * The client will be reset in unblockClientFromModule(). */
+ if (!(c->flags & CLIENT_BLOCKED) ||
+ c->btype != BLOCKED_MODULE)
+ {
+ resetClient(c);
+ }
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
@@ -1902,6 +1872,31 @@ void processInputBuffer(client *c) {
}
}
+/* This is a wrapper for processInputBuffer that also cares about handling
+ * the replication forwarding to the sub-replicas, in case the client 'c'
+ * is flagged as master. Usually you want to call this instead of the
+ * raw processInputBuffer(). */
+void processInputBufferAndReplicate(client *c) {
+ if (!(c->flags & CLIENT_MASTER)) {
+ processInputBuffer(c);
+ } else {
+ /* If the client is a master we need to compute the difference
+ * between the applied offset before and after processing the buffer,
+ * to understand how much of the replication stream was actually
+ * applied to the master state: this quantity, and its corresponding
+ * part of the replication stream, will be propagated to the
+ * sub-replicas and to the replication backlog. */
+ size_t prev_offset = c->reploff;
+ processInputBuffer(c);
+ size_t applied = c->reploff - prev_offset;
+ if (applied) {
+ replicationFeedSlavesFromMasterStream(server.slaves,
+ c->pending_querybuf, applied);
+ sdsrange(c->pending_querybuf,applied,-1);
+ }
+ }
+}
+
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
@@ -1969,7 +1964,7 @@ void readQueryFromClient(connection *conn) {
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
- processInputBuffer(c);
+ processInputBufferAndReplicate(c);
}
void getClientsMaxBuffers(unsigned long *longest_output_list,
@@ -3198,7 +3193,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
continue;
}
}
- processInputBuffer(c);
+ processInputBufferAndReplicate(c);
}
return processed;
}
diff --git a/src/replication.c b/src/replication.c
index 99f233380..12500c0af 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -39,7 +39,6 @@
#include <sys/socket.h>
#include <sys/stat.h>
-long long adjustMeaningfulReplOffset();
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(connection *conn);
void replicationSendAck(void);
@@ -2755,10 +2754,6 @@ void replicationCacheMaster(client *c) {
* pending outputs to the master. */
sdsclear(server.master->querybuf);
sdsclear(server.master->pending_querybuf);
-
- /* Adjust reploff and read_reploff to the last meaningful offset we
- * executed. This is the offset the replica will use for future PSYNC. */
- server.master->reploff = adjustMeaningfulReplOffset();
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
@@ -2783,15 +2778,33 @@ void replicationCacheMaster(client *c) {
replicationHandleMasterDisconnection();
}
-/* If the "meaningful" offset, that is the offset without the final PINGs
- * in the stream, is different than the last offset, use it instead:
- * often when the master is no longer reachable, replicas will never
- * receive the PINGs, however the master will end with an incremented
- * offset because of the PINGs and will not be able to incrementally
- * PSYNC with the new master.
- * This function trims the replication backlog when needed, and returns
- * the offset to be used for future partial sync. */
-long long adjustMeaningfulReplOffset() {
+/* This function is called when a master is turend into a slave, in order to
+ * create from scratch a cached master for the new client, that will allow
+ * to PSYNC with the slave that was promoted as the new master after a
+ * failover.
+ *
+ * Assuming this instance was previously the master instance of the new master,
+ * the new master will accept its replication ID, and potentiall also the
+ * current offset if no data was lost during the failover. So we use our
+ * current replication ID and offset in order to synthesize a cached master. */
+void replicationCacheMasterUsingMyself(void) {
+ serverLog(LL_NOTICE,
+ "Before turning into a replica, using my own master parameters "
+ "to synthesize a cached master: I may be able to synchronize with "
+ "the new master with just a partial transfer.");
+
+ /* This will be used to populate the field server.master->reploff
+ * by replicationCreateMasterClient(). We'll later set the created
+ * master as server.cached_master, so the replica will use such
+ * offset for PSYNC. */
+ server.master_initial_offset = server.master_repl_offset;
+
+ /* However if the "meaningful" offset, that is the offset without
+ * the final PINGs in the stream, is different, use this instead:
+ * often when the master is no longer reachable, replicas will never
+ * receive the PINGs, however the master will end with an incremented
+ * offset because of the PINGs and will not be able to incrementally
+ * PSYNC with the new master. */
if (server.master_repl_offset > server.master_repl_meaningful_offset) {
long long delta = server.master_repl_offset -
server.master_repl_meaningful_offset;
@@ -2801,6 +2814,7 @@ long long adjustMeaningfulReplOffset() {
server.master_repl_meaningful_offset,
server.master_repl_offset,
delta);
+ server.master_initial_offset = server.master_repl_meaningful_offset;
server.master_repl_offset = server.master_repl_meaningful_offset;
if (server.repl_backlog_histlen <= delta) {
server.repl_backlog_histlen = 0;
@@ -2812,29 +2826,6 @@ long long adjustMeaningfulReplOffset() {
server.repl_backlog_size;
}
}
- return server.master_repl_offset;
-}
-
-/* This function is called when a master is turend into a slave, in order to
- * create from scratch a cached master for the new client, that will allow
- * to PSYNC with the slave that was promoted as the new master after a
- * failover.
- *
- * Assuming this instance was previously the master instance of the new master,
- * the new master will accept its replication ID, and potentiall also the
- * current offset if no data was lost during the failover. So we use our
- * current replication ID and offset in order to synthesize a cached master. */
-void replicationCacheMasterUsingMyself(void) {
- serverLog(LL_NOTICE,
- "Before turning into a replica, using my own master parameters "
- "to synthesize a cached master: I may be able to synchronize with "
- "the new master with just a partial transfer.");
-
- /* This will be used to populate the field server.master->reploff
- * by replicationCreateMasterClient(). We'll later set the created
- * master as server.cached_master, so the replica will use such
- * offset for PSYNC. */
- server.master_initial_offset = adjustMeaningfulReplOffset();
/* The master client we create can be set to any DBID, because
* the new master will start its replication stream with SELECT. */
diff --git a/src/server.h b/src/server.h
index f835bf5e9..4b4fd4e0d 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1608,6 +1608,7 @@ void setDeferredSetLen(client *c, void *node, long length);
void setDeferredAttributeLen(client *c, void *node, long length);
void setDeferredPushLen(client *c, void *node, long length);
void processInputBuffer(client *c);
+void processInputBufferAndReplicate(client *c);
void processGopherRequest(client *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl
index 29a880f99..40ca55607 100644
--- a/tests/integration/psync2.tcl
+++ b/tests/integration/psync2.tcl
@@ -117,7 +117,6 @@ start_server {} {
set used [list $master_id]
test "PSYNC2: \[NEW LAYOUT\] Set #$master_id as master" {
$R($master_id) slaveof no one
- $R($master_id) config set repl-ping-replica-period 1 ;# increse the chance that random ping will cause issues
if {$counter_value == 0} {
$R($master_id) set x $counter_value
}
@@ -259,14 +258,9 @@ start_server {} {
$R($j) slaveof $master_host $master_port
}
- # Wait for replicas to sync. it is not enough to just wait for connected_slaves==4
- # since we might do the check before the master realized that they're disconnected
+ # Wait for slaves to sync
wait_for_condition 50 1000 {
- [status $R($master_id) connected_slaves] == 4 &&
- [status $R([expr {($master_id+1)%5}]) master_link_status] == "up" &&
- [status $R([expr {($master_id+2)%5}]) master_link_status] == "up" &&
- [status $R([expr {($master_id+3)%5}]) master_link_status] == "up" &&
- [status $R([expr {($master_id+4)%5}]) master_link_status] == "up"
+ [status $R($master_id) connected_slaves] == 4
} else {
show_cluster_status
fail "Replica not reconnecting"
@@ -278,7 +272,6 @@ start_server {} {
set slave_id [expr {($master_id+1)%5}]
set sync_count [status $R($master_id) sync_full]
set sync_partial [status $R($master_id) sync_partial_ok]
- set sync_partial_err [status $R($master_id) sync_partial_err]
catch {
$R($slave_id) config rewrite
$R($slave_id) debug restart
@@ -370,103 +363,3 @@ start_server {} {
}
}}}}}
-
-start_server {tags {"psync2"}} {
-start_server {} {
-start_server {} {
-start_server {} {
-start_server {} {
- test {pings at the end of replication stream are ignored for psync} {
- set master [srv -4 client]
- set master_host [srv -4 host]
- set master_port [srv -4 port]
- set replica1 [srv -3 client]
- set replica2 [srv -2 client]
- set replica3 [srv -1 client]
- set replica4 [srv -0 client]
-
- $replica1 replicaof $master_host $master_port
- $replica2 replicaof $master_host $master_port
- $replica3 replicaof $master_host $master_port
- $replica4 replicaof $master_host $master_port
- wait_for_condition 50 1000 {
- [status $master connected_slaves] == 4
- } else {
- fail "replicas didn't connect"
- }
-
- $master incr x
- wait_for_condition 50 1000 {
- [$replica1 get x] == 1 && [$replica2 get x] == 1 &&
- [$replica3 get x] == 1 && [$replica4 get x] == 1
- } else {
- fail "replicas didn't get incr"
- }
-
- # disconnect replica1 and replica2
- # and wait for the master to send a ping to replica3 and replica4
- $replica1 replicaof no one
- $replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id
- $master config set repl-ping-replica-period 1
- after 1500
-
- # make everyone sync from the replica1 that didn't get the last ping from the old master
- # replica4 will keep syncing from the old master which now syncs from replica1
- # and replica2 will re-connect to the old master (which went back in time)
- set new_master_host [srv -3 host]
- set new_master_port [srv -3 port]
- $replica3 replicaof $new_master_host $new_master_port
- $master replicaof $new_master_host $new_master_port
- $replica2 replicaof $master_host $master_port
- wait_for_condition 50 1000 {
- [status $replica2 master_link_status] == "up" &&
- [status $replica3 master_link_status] == "up" &&
- [status $replica4 master_link_status] == "up" &&
- [status $master master_link_status] == "up"
- } else {
- fail "replicas didn't connect"
- }
-
- # make sure replication is still alive and kicking
- $replica1 incr x
- wait_for_condition 50 1000 {
- [$replica2 get x] == 2 &&
- [$replica3 get x] == 2 &&
- [$replica4 get x] == 2 &&
- [$master get x] == 2
- } else {
- fail "replicas didn't get incr"
- }
-
- # make sure there are full syncs other than the initial ones
- assert_equal [status $master sync_full] 4
- assert_equal [status $replica1 sync_full] 0
- assert_equal [status $replica2 sync_full] 0
- assert_equal [status $replica3 sync_full] 0
- assert_equal [status $replica4 sync_full] 0
-
- # force psync
- $master client kill type master
- $replica2 client kill type master
- $replica3 client kill type master
- $replica4 client kill type master
-
- # make sure replication is still alive and kicking
- $replica1 incr x
- wait_for_condition 50 1000 {
- [$replica2 get x] == 3 &&
- [$replica3 get x] == 3 &&
- [$replica4 get x] == 3 &&
- [$master get x] == 3
- } else {
- fail "replicas didn't get incr"
- }
-
- # make sure there are full syncs other than the initial ones
- assert_equal [status $master sync_full] 4
- assert_equal [status $replica1 sync_full] 0
- assert_equal [status $replica2 sync_full] 0
- assert_equal [status $replica3 sync_full] 0
- assert_equal [status $replica4 sync_full] 0
-}
-}}}}}