summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-04-19 10:25:45 +0200
committerantirez <antirez@gmail.com>2017-04-20 07:58:22 +0200
commita91cc5bc2df15d16d29ddd263ee04e482fa8762f (patch)
tree945ed77a1c7f6c6f0b19eacc3e60acec0193dd2c
parent278972ceb1cc24663c6acfc2654a04148a3ed26f (diff)
downloadredis-a91cc5bc2df15d16d29ddd263ee04e482fa8762f.tar.gz
Fix PSYNC2 incomplete command bug as described in #3899.
This bug was discovered by @kevinmcgehee and constituted a major hidden bug in the PSYNC2 implementation, caused by the propagation from the master of incomplete commands to slaves. The bug had several results: 1. Borrowing from Kevin text in the issue: "Given that slaves blindly copy over their master's input into their own replication backlog over successive read syscalls, it's possible that with large commands or small TCP buffers, partial commands are present in this buffer. If the master were to fail before successfully propagating the entire command to a slave, the slaves will never execute the partial command (since the client is invalidated) but will copy it to replication backlog which may relay those invalid bytes to its slaves on PSYNC2, corrupting the backlog and possibly other valid commands that follow the failover. Simple command boundaries aren't sufficient to capture this, either, because in the case of a MULTI/EXEC block, if the master successfully propagates a subset of the commands but not the EXEC, then the transaction in the backlog becomes corrupt and could corrupt other slaves that consume this data." 2. As identified by @yangsiran later, there is another effect of the bug. For the same mechanism of the first problem, a slave having another slave, could receive a full resynchronization request with an already half-applied command in the backlog. Once the RDB is ready, it will be sent to the slave, and the replication will continue sending to the sub-slave the other half of the command, which is not valid. The fix, designed by @yangsiran and @antirez, and implemented by @antirez, uses a secondary buffer in order to feed the sub-masters and update the replication backlog and offsets, only when a given part of the query buffer is actually *applied* to the state of the instance, that is, when the command gets processed and the command is not pending in the Redis transaction buffer because of CLIENT_MULTI state. Given that now the backlog and offsets representation are in agreement with the actual processed commands, both issue 1 and 2 should no longer be possible. Thanks to @kevinmcgehee, @yangsiran and @oranagra for their work in identifying and designing a fix for this problem.
-rw-r--r--src/networking.c42
-rw-r--r--src/replication.c7
-rw-r--r--src/server.h6
3 files changed, 47 insertions, 8 deletions
diff --git a/src/networking.c b/src/networking.c
index fbab9970f..fae8e52bd 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -93,6 +93,7 @@ client *createClient(int fd) {
c->name = NULL;
c->bufpos = 0;
c->querybuf = sdsempty();
+ c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
@@ -107,6 +108,7 @@ client *createClient(int fd) {
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
+ c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
@@ -796,6 +798,7 @@ void freeClient(client *c) {
/* Free the query buffer */
sdsfree(c->querybuf);
+ sdsfree(c->pending_querybuf);
c->querybuf = NULL;
/* Deallocate structures used to block on blocking ops. */
@@ -1318,8 +1321,13 @@ void processInputBuffer(client *c) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
- if (processCommand(c) == C_OK)
+ if (processCommand(c) == C_OK) {
+ if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
+ /* Update the applied replication offset of our master. */
+ c->reploff = c->read_reploff - sdslen(c->querybuf);
+ }
resetClient(c);
+ }
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) break;
@@ -1366,15 +1374,17 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
+ } else if (c->flags & CLIENT_MASTER) {
+ /* Append the query buffer to the pending (not applied) buffer
+ * of the master. We'll use this buffer later in order to have a
+ * copy of the string applied by the last command executed. */
+ c->pending_querybuf = sdscatlen(c->pending_querybuf,
+ c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
- if (c->flags & CLIENT_MASTER) {
- c->reploff += nread;
- replicationFeedSlavesFromMasterStream(server.slaves,
- c->querybuf+qblen,nread);
- }
+ if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
@@ -1386,7 +1396,25 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
freeClient(c);
return;
}
- processInputBuffer(c);
+
+ /* Time to process the buffer. If the client is a master we need to
+ * compute the difference between the applied offset before and after
+ * processing the buffer, to understand how much of the replication stream
+ * was actually applied to the master state: this quantity, and its
+ * corresponding part of the replication stream, will be propagated to
+ * the sub-slaves and to the replication backlog. */
+ if (!(c->flags & CLIENT_MASTER)) {
+ processInputBuffer(c);
+ } else {
+ size_t prev_offset = c->reploff;
+ processInputBuffer(c);
+ size_t applied = c->reploff - prev_offset;
+ if (applied) {
+ replicationFeedSlavesFromMasterStream(server.slaves,
+ c->pending_querybuf, applied);
+ sdsrange(c->pending_querybuf,applied,-1);
+ }
+ }
}
void getClientsMaxBuffers(unsigned long *longest_output_list,
diff --git a/src/replication.c b/src/replication.c
index c7a703b85..91ede828d 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1078,6 +1078,7 @@ void replicationCreateMasterClient(int fd, int dbid) {
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
+ server.master->read_reploff = server.master->reploff;
memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
@@ -2118,6 +2119,12 @@ void replicationCacheMaster(client *c) {
/* Unlink the client from the server structures. */
unlinkClient(c);
+ /* Fix the master specific fields: we want to discard to non processed
+ * query buffers and non processed offsets. */
+ sdsclear(server.master->querybuf);
+ sdsclear(server.master->pending_querybuf);
+ server.master->read_reploff = server.master->reploff;
+
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.master;
diff --git a/src/server.h b/src/server.h
index 19be92ba2..8cc172149 100644
--- a/src/server.h
+++ b/src/server.h
@@ -663,6 +663,9 @@ typedef struct client {
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
+ sds pending_querybuf; /* If this is a master, this buffer represents the
+ yet not applied replication stream that we
+ are receiving from the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
@@ -685,7 +688,8 @@ typedef struct client {
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
- long long reploff; /* Replication offset if this is our master. */
+ long long read_reploff; /* Read replication offset if this is a master. */
+ long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves