summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/blocked.c5
-rw-r--r--src/networking.c18
-rw-r--r--src/server.c10
-rw-r--r--tests/unit/pause.tcl51
4 files changed, 75 insertions, 9 deletions
diff --git a/src/blocked.c b/src/blocked.c
index ba47796b6..6ce4b9893 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -87,6 +87,11 @@ typedef struct bkinfo {
* flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
+ /* Master client should never be blocked unless pause or module */
+ serverAssert(!(c->flags & CLIENT_MASTER &&
+ btype != BLOCKED_MODULE &&
+ btype != BLOCKED_PAUSE));
+
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
server.blocked_clients++;
diff --git a/src/networking.c b/src/networking.c
index c18b8f247..6487c0fdb 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1989,19 +1989,23 @@ int processMultibulkBuffer(client *c) {
* 2. In the case of master clients, the replication offset is updated.
* 3. Propagate commands we got from our master to replicas down the line. */
void commandProcessed(client *c) {
+ /* If client is blocked(including paused), just return avoid reset and replicate.
+ *
+ * 1. Don't reset the client structure for blocked clients, so that the reply
+ * callback will still be able to access the client argv and argc fields.
+ * The client will be reset in unblockClient().
+ * 2. Don't update replication offset or propagate commands to replicas,
+ * since we have not applied the command. */
+ if (c->flags & CLIENT_BLOCKED) return;
+
+ resetClient(c);
+
long long prev_offset = c->reploff;
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
- /* Don't reset the client structure for blocked clients, so that the reply
- * callback will still be able to access the client argv and argc fields.
- * The client will be reset in unblockClient(). */
- if (!(c->flags & CLIENT_BLOCKED)) {
- resetClient(c);
- }
-
/* If the client is a master we need to compute the difference
* between the applied offset before and after processing the buffer,
* to understand how much of the replication stream was actually
diff --git a/src/server.c b/src/server.c
index 7ebc1e72f..15a4080d4 100644
--- a/src/server.c
+++ b/src/server.c
@@ -5115,11 +5115,15 @@ sds genRedisInfoString(const char *section) {
server.masterhost == NULL ? "master" : "slave");
if (server.masterhost) {
long long slave_repl_offset = 1;
+ long long slave_read_repl_offset = 1;
- if (server.master)
+ if (server.master) {
slave_repl_offset = server.master->reploff;
- else if (server.cached_master)
+ slave_read_repl_offset = server.master->read_reploff;
+ } else if (server.cached_master) {
slave_repl_offset = server.cached_master->reploff;
+ slave_read_repl_offset = server.cached_master->read_reploff;
+ }
info = sdscatprintf(info,
"master_host:%s\r\n"
@@ -5127,6 +5131,7 @@ sds genRedisInfoString(const char *section) {
"master_link_status:%s\r\n"
"master_last_io_seconds_ago:%d\r\n"
"master_sync_in_progress:%d\r\n"
+ "slave_read_repl_offset:%lld\r\n"
"slave_repl_offset:%lld\r\n"
,server.masterhost,
server.masterport,
@@ -5135,6 +5140,7 @@ sds genRedisInfoString(const char *section) {
server.master ?
((int)(server.unixtime-server.master->lastinteraction)) : -1,
server.repl_state == REPL_STATE_TRANSFER,
+ slave_read_repl_offset,
slave_repl_offset
);
diff --git a/tests/unit/pause.tcl b/tests/unit/pause.tcl
index 35d8dff6b..a9a1f3554 100644
--- a/tests/unit/pause.tcl
+++ b/tests/unit/pause.tcl
@@ -163,6 +163,57 @@ start_server {tags {"pause network"}} {
$rd close
}
+ start_server {tags {needs:repl external:skip}} {
+ set master [srv -1 client]
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+
+ # Avoid PINGs
+ $master config set repl-ping-replica-period 3600
+ r replicaof $master_host $master_port
+
+ wait_for_condition 50 100 {
+ [s master_link_status] eq {up}
+ } else {
+ fail "Replication not started."
+ }
+
+ test "Test when replica paused, offset would not grow" {
+ $master set foo bar
+ set old_master_offset [status $master master_repl_offset]
+
+ wait_for_condition 50 100 {
+ [s slave_repl_offset] == [status $master master_repl_offset]
+ } else {
+ fail "Replication offset not matched."
+ }
+
+ r client pause 100000 write
+ $master set foo2 bar2
+
+ # Make sure replica received data from master
+ wait_for_condition 50 100 {
+ [s slave_read_repl_offset] == [status $master master_repl_offset]
+ } else {
+ fail "Replication not work."
+ }
+
+ # Replica would not apply the write command
+ assert {[s slave_repl_offset] == $old_master_offset}
+ r get foo2
+ } {}
+
+ test "Test replica offset would grow after unpause" {
+ r client unpause
+ wait_for_condition 50 100 {
+ [s slave_repl_offset] == [status $master master_repl_offset]
+ } else {
+ fail "Replication not continue."
+ }
+ r get foo2
+ } {bar2}
+ }
+
# Make sure we unpause at the end
r client unpause
}