diff options
-rw-r--r-- | src/blocked.c | 5 | ||||
-rw-r--r-- | src/networking.c | 18 | ||||
-rw-r--r-- | src/server.c | 10 | ||||
-rw-r--r-- | tests/unit/pause.tcl | 51 |
4 files changed, 75 insertions, 9 deletions
diff --git a/src/blocked.c b/src/blocked.c index ba47796b6..6ce4b9893 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -87,6 +87,11 @@ typedef struct bkinfo { * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { + /* Master client should never be blocked unless pause or module */ + serverAssert(!(c->flags & CLIENT_MASTER && + btype != BLOCKED_MODULE && + btype != BLOCKED_PAUSE)); + c->flags |= CLIENT_BLOCKED; c->btype = btype; server.blocked_clients++; diff --git a/src/networking.c b/src/networking.c index c18b8f247..6487c0fdb 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1989,19 +1989,23 @@ int processMultibulkBuffer(client *c) { * 2. In the case of master clients, the replication offset is updated. * 3. Propagate commands we got from our master to replicas down the line. */ void commandProcessed(client *c) { + /* If client is blocked(including paused), just return avoid reset and replicate. + * + * 1. Don't reset the client structure for blocked clients, so that the reply + * callback will still be able to access the client argv and argc fields. + * The client will be reset in unblockClient(). + * 2. Don't update replication offset or propagate commands to replicas, + * since we have not applied the command. */ + if (c->flags & CLIENT_BLOCKED) return; + + resetClient(c); + long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; } - /* Don't reset the client structure for blocked clients, so that the reply - * callback will still be able to access the client argv and argc fields. - * The client will be reset in unblockClient(). */ - if (!(c->flags & CLIENT_BLOCKED)) { - resetClient(c); - } - /* If the client is a master we need to compute the difference * between the applied offset before and after processing the buffer, * to understand how much of the replication stream was actually diff --git a/src/server.c b/src/server.c index 7ebc1e72f..15a4080d4 100644 --- a/src/server.c +++ b/src/server.c @@ -5115,11 +5115,15 @@ sds genRedisInfoString(const char *section) { server.masterhost == NULL ? "master" : "slave"); if (server.masterhost) { long long slave_repl_offset = 1; + long long slave_read_repl_offset = 1; - if (server.master) + if (server.master) { slave_repl_offset = server.master->reploff; - else if (server.cached_master) + slave_read_repl_offset = server.master->read_reploff; + } else if (server.cached_master) { slave_repl_offset = server.cached_master->reploff; + slave_read_repl_offset = server.cached_master->read_reploff; + } info = sdscatprintf(info, "master_host:%s\r\n" @@ -5127,6 +5131,7 @@ sds genRedisInfoString(const char *section) { "master_link_status:%s\r\n" "master_last_io_seconds_ago:%d\r\n" "master_sync_in_progress:%d\r\n" + "slave_read_repl_offset:%lld\r\n" "slave_repl_offset:%lld\r\n" ,server.masterhost, server.masterport, @@ -5135,6 +5140,7 @@ sds genRedisInfoString(const char *section) { server.master ? ((int)(server.unixtime-server.master->lastinteraction)) : -1, server.repl_state == REPL_STATE_TRANSFER, + slave_read_repl_offset, slave_repl_offset ); diff --git a/tests/unit/pause.tcl b/tests/unit/pause.tcl index 35d8dff6b..a9a1f3554 100644 --- a/tests/unit/pause.tcl +++ b/tests/unit/pause.tcl @@ -163,6 +163,57 @@ start_server {tags {"pause network"}} { $rd close } + start_server {tags {needs:repl external:skip}} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + + # Avoid PINGs + $master config set repl-ping-replica-period 3600 + r replicaof $master_host $master_port + + wait_for_condition 50 100 { + [s master_link_status] eq {up} + } else { + fail "Replication not started." + } + + test "Test when replica paused, offset would not grow" { + $master set foo bar + set old_master_offset [status $master master_repl_offset] + + wait_for_condition 50 100 { + [s slave_repl_offset] == [status $master master_repl_offset] + } else { + fail "Replication offset not matched." + } + + r client pause 100000 write + $master set foo2 bar2 + + # Make sure replica received data from master + wait_for_condition 50 100 { + [s slave_read_repl_offset] == [status $master master_repl_offset] + } else { + fail "Replication not work." + } + + # Replica would not apply the write command + assert {[s slave_repl_offset] == $old_master_offset} + r get foo2 + } {} + + test "Test replica offset would grow after unpause" { + r client unpause + wait_for_condition 50 100 { + [s slave_repl_offset] == [status $master master_repl_offset] + } else { + fail "Replication not continue." + } + r get foo2 + } {bar2} + } + # Make sure we unpause at the end r client unpause } |