diff options
author | zhaozhao.zz <276441700@qq.com> | 2021-09-08 16:07:25 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-08 16:07:25 +0800 |
commit | 1b83353dc382959e218191f64d94edb9703552e3 (patch) | |
tree | 65c87fd59e43f7d24769d1e7e2f7bd1104246943 /src | |
parent | 74d9a35621d66a816e1c30de9cd84c461297495a (diff) | |
download | redis-1b83353dc382959e218191f64d94edb9703552e3.tar.gz |
Fix wrong offset when replica pause (#9448)
When a replica paused, it would not apply any commands event the command comes from master, if we feed the non-applied command to replication stream, the replication offset would be wrong, and data would be lost after failover(since replica's `master_repl_offset` grows but command is not applied).
To fix it, here are the changes:
* Don't update replica's replication offset or propagate commands to sub-replicas when it's paused in `commandProcessed`.
* Show `slave_read_repl_offset` in info reply.
* Add an assert to make sure master client should never be blocked unless pause or module (some modules may use block way to do background (parallel) processing and forward original block module command to the replica, it's not a good way but it can work, so the assert excludes module now, but someday in future all modules should rewrite block command to propagate like what `BLPOP` does).
Diffstat (limited to 'src')
-rw-r--r-- | src/blocked.c | 5 | ||||
-rw-r--r-- | src/networking.c | 18 | ||||
-rw-r--r-- | src/server.c | 10 |
3 files changed, 24 insertions, 9 deletions
diff --git a/src/blocked.c b/src/blocked.c index ba47796b6..6ce4b9893 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -87,6 +87,11 @@ typedef struct bkinfo { * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { + /* Master client should never be blocked unless pause or module */ + serverAssert(!(c->flags & CLIENT_MASTER && + btype != BLOCKED_MODULE && + btype != BLOCKED_PAUSE)); + c->flags |= CLIENT_BLOCKED; c->btype = btype; server.blocked_clients++; diff --git a/src/networking.c b/src/networking.c index c18b8f247..6487c0fdb 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1989,19 +1989,23 @@ int processMultibulkBuffer(client *c) { * 2. In the case of master clients, the replication offset is updated. * 3. Propagate commands we got from our master to replicas down the line. */ void commandProcessed(client *c) { + /* If client is blocked(including paused), just return avoid reset and replicate. + * + * 1. Don't reset the client structure for blocked clients, so that the reply + * callback will still be able to access the client argv and argc fields. + * The client will be reset in unblockClient(). + * 2. Don't update replication offset or propagate commands to replicas, + * since we have not applied the command. */ + if (c->flags & CLIENT_BLOCKED) return; + + resetClient(c); + long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; } - /* Don't reset the client structure for blocked clients, so that the reply - * callback will still be able to access the client argv and argc fields. - * The client will be reset in unblockClient(). */ - if (!(c->flags & CLIENT_BLOCKED)) { - resetClient(c); - } - /* If the client is a master we need to compute the difference * between the applied offset before and after processing the buffer, * to understand how much of the replication stream was actually diff --git a/src/server.c b/src/server.c index 7ebc1e72f..15a4080d4 100644 --- a/src/server.c +++ b/src/server.c @@ -5115,11 +5115,15 @@ sds genRedisInfoString(const char *section) { server.masterhost == NULL ? "master" : "slave"); if (server.masterhost) { long long slave_repl_offset = 1; + long long slave_read_repl_offset = 1; - if (server.master) + if (server.master) { slave_repl_offset = server.master->reploff; - else if (server.cached_master) + slave_read_repl_offset = server.master->read_reploff; + } else if (server.cached_master) { slave_repl_offset = server.cached_master->reploff; + slave_read_repl_offset = server.cached_master->read_reploff; + } info = sdscatprintf(info, "master_host:%s\r\n" @@ -5127,6 +5131,7 @@ sds genRedisInfoString(const char *section) { "master_link_status:%s\r\n" "master_last_io_seconds_ago:%d\r\n" "master_sync_in_progress:%d\r\n" + "slave_read_repl_offset:%lld\r\n" "slave_repl_offset:%lld\r\n" ,server.masterhost, server.masterport, @@ -5135,6 +5140,7 @@ sds genRedisInfoString(const char *section) { server.master ? ((int)(server.unixtime-server.master->lastinteraction)) : -1, server.repl_state == REPL_STATE_TRANSFER, + slave_read_repl_offset, slave_repl_offset ); |