summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2018-07-14 16:49:15 +0300
committerOran Agra <oran@redislabs.com>2018-07-17 12:51:49 +0300
commitd55598988b3381afff87bc974f5cfda0e8ae77e0 (patch)
tree1ab97d0d501cf53dd3e2f51b392805ff3d188422
parentcefe21d28a75f4fdbf24823ce42e777c2b9d5c6f (diff)
downloadredis-d55598988b3381afff87bc974f5cfda0e8ae77e0.tar.gz
fix rare replication stream corruption with disk-based replication
The slave sends \n keepalive messages to the master while parsing the rdb, and later sends REPLCONF ACK once a second. rarely, the master recives both a linefeed char and a REPLCONF in the same read, \n*3\r\n$8\r\nREPLCONF\r\n... and it tries to trim two chars (\r\n) from the query buffer, trimming the '*' from *3\r\n$8\r\nREPLCONF\r\n... then the master tries to process a command starting with '3' and replies to the slave a bunch of -ERR and one +OK. although the slave silently ignores these (prints a log message), this corrupts the replication offset at the slave since the slave increases the replication offset, and the master did not. other than the fix in processInlineBuffer, i did several other improvments while hunting this very rare bug. - when redis replies with "unknown command" it includes a portion of the arguments, not just the command name. so it would be easier to understand what was recived, in my case, on the slave side, it was -ERR, but the "arguments" were the interesting part (containing info on the error). - about a year ago i added code in addReplyErrorLength to print the error to the log in case of a reply to master (since this string isn't actually trasmitted to the master), now changed that block to print a similar log message to indicate an error being sent from the master to the slave. note that the slave is marked as CLIENT_SLAVE only after PSYNC was received, so this will not cause any harm for REPLCONF, and will only indicate problems that are gonna corrupt the replication stream anyway. - two places were c->reply was emptied, and i wanted to reset sentlen this is a precaution (i did not actually see such a problem), since a non-zero sentlen will cause corruption to be transmitted on the socket.
-rw-r--r--src/networking.c17
-rw-r--r--src/replication.c1
-rw-r--r--src/server.c9
3 files changed, 18 insertions, 9 deletions
diff --git a/src/networking.c b/src/networking.c
index d23185ffa..f6882f697 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -342,11 +342,13 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
if (!len || s[0] != '-') addReplyString(c,"-ERR ",5);
addReplyString(c,s,len);
addReplyString(c,"\r\n",2);
- if (c->flags & CLIENT_MASTER) {
+ if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) {
+ char* to = c->flags & CLIENT_MASTER? "master": "slave";
+ char* from = c->flags & CLIENT_MASTER? "slave": "master";
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
- serverLog(LL_WARNING,"== CRITICAL == This slave is sending an error "
- "to its master: '%s' after processing the command "
- "'%s'", s, cmdname);
+ serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
+ "to its %s: '%s' after processing the command "
+ "'%s'", from, to, s, cmdname);
}
}
@@ -608,6 +610,7 @@ void addReplySubcommandSyntaxError(client *c) {
* destination client. */
void copyClientOutputBuffer(client *dst, client *src) {
listRelease(dst->reply);
+ dst->sentlen = 0;
dst->reply = listDup(src->reply);
memcpy(dst->buf,src->buf,src->bufpos);
dst->bufpos = src->bufpos;
@@ -1094,7 +1097,7 @@ void resetClient(client *c) {
* with the error and close the connection. */
int processInlineBuffer(client *c) {
char *newline;
- int argc, j;
+ int argc, j, linefeed_chars = 1;
sds *argv, aux;
size_t querylen;
@@ -1112,7 +1115,7 @@ int processInlineBuffer(client *c) {
/* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r')
- newline--;
+ newline--, linefeed_chars++;
/* Split the input buffer up to the \r\n */
querylen = newline-(c->querybuf);
@@ -1132,7 +1135,7 @@ int processInlineBuffer(client *c) {
c->repl_ack_time = server.unixtime;
/* Leave data after the first line of the query in the buffer */
- sdsrange(c->querybuf,querylen+2,-1);
+ sdsrange(c->querybuf,querylen+linefeed_chars,-1);
/* Setup argv array on client structure */
if (argc) {
diff --git a/src/replication.c b/src/replication.c
index bc3755398..6d589c012 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -2148,6 +2148,7 @@ void replicationCacheMaster(client *c) {
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
+ c->sentlen = 0;
c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);
diff --git a/src/server.c b/src/server.c
index 971595872..1e717b23c 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2449,8 +2449,13 @@ int processCommand(client *c) {
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
- addReplyErrorFormat(c,"unknown command '%s'",
- (char*)c->argv[0]->ptr);
+ sds args = sdsempty();
+ int i;
+ for (i=1; i < c->argc && sdslen(args) < 128; i++)
+ args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
+ addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
+ (char*)c->argv[0]->ptr, args);
+ sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {