From 14c4ddb5a62a52e30ee169d36b516a78a410a5b4 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 14 Aug 2018 00:43:36 +0800 Subject: pipeline: do not sdsrange querybuf unless all commands processed This is an optimization for processing pipeline, we discussed a problem in issue #5229: clients may be paused if we apply `CLIENT PAUSE` command, and then querybuf may grow too large, the cost of memmove in sdsrange after parsing a completed command will be horrible. The optimization is that parsing all commands in queyrbuf , after that we can just call sdsrange only once. --- src/networking.c | 87 ++++++++++++++++++++++++++++++-------------------------- src/server.h | 1 + 2 files changed, 48 insertions(+), 40 deletions(-) diff --git a/src/networking.c b/src/networking.c index af7422178..945fdd961 100644 --- a/src/networking.c +++ b/src/networking.c @@ -110,6 +110,7 @@ client *createClient(int fd) { c->fd = fd; c->name = NULL; c->bufpos = 0; + c->qb_pos = 0; c->querybuf = sdsempty(); c->pending_querybuf = sdsempty(); c->querybuf_peak = 0; @@ -1119,11 +1120,11 @@ int processInlineBuffer(client *c) { size_t querylen; /* Search for end of line */ - newline = strchr(c->querybuf,'\n'); + newline = strchr(c->querybuf+c->qb_pos,'\n'); /* Nothing to do without a \r\n */ if (newline == NULL) { - if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { + if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big inline request"); setProtocolError("too big inline request",c,0); } @@ -1131,12 +1132,12 @@ int processInlineBuffer(client *c) { } /* Handle the \r\n case. */ - if (newline && newline != c->querybuf && *(newline-1) == '\r') + if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r') newline--, linefeed_chars++; /* Split the input buffer up to the \r\n */ - querylen = newline-(c->querybuf); - aux = sdsnewlen(c->querybuf,querylen); + querylen = newline-(c->querybuf+c->qb_pos); + aux = sdsnewlen(c->querybuf+c->qb_pos,querylen); argv = sdssplitargs(aux,&argc); sdsfree(aux); if (argv == NULL) { @@ -1152,7 +1153,8 @@ int processInlineBuffer(client *c) { c->repl_ack_time = server.unixtime; /* Leave data after the first line of the query in the buffer */ - sdsrange(c->querybuf,querylen+linefeed_chars,-1); + sdsrange(c->querybuf,c->qb_pos+querylen+linefeed_chars,-1); + c->qb_pos = 0; /* Setup argv array on client structure */ if (argc) { @@ -1182,10 +1184,10 @@ static void setProtocolError(const char *errstr, client *c, long pos) { /* Sample some protocol to given an idea about what was inside. */ char buf[256]; - if (sdslen(c->querybuf) < PROTO_DUMP_LEN) { - snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf); + if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) { + snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos); } else { - snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf, sdslen(c->querybuf)-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); + snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); } /* Remove non printable chars. */ @@ -1202,6 +1204,7 @@ static void setProtocolError(const char *errstr, client *c, long pos) { } c->flags |= CLIENT_CLOSE_AFTER_REPLY; sdsrange(c->querybuf,pos,-1); + c->qb_pos -= pos; } /* Process the query buffer for client 'c', setting up the client argument @@ -1217,7 +1220,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) { * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ int processMultibulkBuffer(client *c) { char *newline = NULL; - long pos = 0; int ok; long long ll; @@ -1226,9 +1228,9 @@ int processMultibulkBuffer(client *c) { serverAssertWithInfo(c,NULL,c->argc == 0); /* Multi bulk length cannot be read without a \r\n */ - newline = strchr(c->querybuf,'\r'); + newline = strchr(c->querybuf+c->qb_pos,'\r'); if (newline == NULL) { - if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { + if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big mbulk count string"); setProtocolError("too big mbulk count string",c,0); } @@ -1236,22 +1238,23 @@ int processMultibulkBuffer(client *c) { } /* Buffer should also contain \n */ - if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) + if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) return C_ERR; /* We know for sure there is a whole line since newline != NULL, * so go ahead and find out the multi bulk length. */ - serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); - ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); + serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*'); + ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); if (!ok || ll > 1024*1024) { addReplyError(c,"Protocol error: invalid multibulk length"); - setProtocolError("invalid mbulk count",c,pos); + setProtocolError("invalid mbulk count",c,c->qb_pos); return C_ERR; } - pos = (newline-c->querybuf)+2; + c->qb_pos = (newline-c->querybuf)+2; if (ll <= 0) { - sdsrange(c->querybuf,pos,-1); + sdsrange(c->querybuf,c->qb_pos,-1); + c->qb_pos = 0; return C_OK; } @@ -1266,9 +1269,9 @@ int processMultibulkBuffer(client *c) { while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { - newline = strchr(c->querybuf+pos,'\r'); + newline = strchr(c->querybuf+c->qb_pos,'\r'); if (newline == NULL) { - if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { + if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { addReplyError(c, "Protocol error: too big bulk count string"); setProtocolError("too big bulk count string",c,0); @@ -1278,25 +1281,25 @@ int processMultibulkBuffer(client *c) { } /* Buffer should also contain \n */ - if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) + if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) break; - if (c->querybuf[pos] != '$') { + if (c->querybuf[c->qb_pos] != '$') { addReplyErrorFormat(c, "Protocol error: expected '$', got '%c'", - c->querybuf[pos]); - setProtocolError("expected $ but got something else",c,pos); + c->querybuf[c->qb_pos]); + setProtocolError("expected $ but got something else",c,c->qb_pos); return C_ERR; } - ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); + ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); if (!ok || ll < 0 || ll > server.proto_max_bulk_len) { addReplyError(c,"Protocol error: invalid bulk length"); - setProtocolError("invalid bulk length",c,pos); + setProtocolError("invalid bulk length",c,c->qb_pos); return C_ERR; } - pos += newline-(c->querybuf+pos)+2; + c->qb_pos = newline-c->querybuf+2; if (ll >= PROTO_MBULK_BIG_ARG) { size_t qblen; @@ -1304,8 +1307,8 @@ int processMultibulkBuffer(client *c) { * try to make it likely that it will start at c->querybuf * boundary so that we can optimize object creation * avoiding a large copy of data. */ - sdsrange(c->querybuf,pos,-1); - pos = 0; + sdsrange(c->querybuf,c->qb_pos,-1); + c->qb_pos = 0; qblen = sdslen(c->querybuf); /* Hint the sds library about the amount of bytes this string is * going to contain. */ @@ -1316,14 +1319,14 @@ int processMultibulkBuffer(client *c) { } /* Read bulk argument */ - if (sdslen(c->querybuf)-pos < (size_t)(c->bulklen+2)) { + if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { /* Not enough data (+2 == trailing \r\n) */ break; } else { /* Optimization: if the buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ - if (pos == 0 && + if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { @@ -1333,20 +1336,16 @@ int processMultibulkBuffer(client *c) { * likely... */ c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); sdsclear(c->querybuf); - pos = 0; } else { c->argv[c->argc++] = - createStringObject(c->querybuf+pos,c->bulklen); - pos += c->bulklen+2; + createStringObject(c->querybuf+c->qb_pos,c->bulklen); + c->qb_pos += c->bulklen+2; } c->bulklen = -1; c->multibulklen--; } } - /* Trim to pos */ - if (pos) sdsrange(c->querybuf,pos,-1); - /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) return C_OK; @@ -1360,8 +1359,9 @@ int processMultibulkBuffer(client *c) { * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { server.current_client = c; + /* Keep processing while there is something in the input buffer */ - while(sdslen(c->querybuf)) { + while(c->qb_pos < sdslen(c->querybuf)) { /* Return if clients are paused. */ if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; @@ -1377,7 +1377,7 @@ void processInputBuffer(client *c) { /* Determine request type when unknown. */ if (!c->reqtype) { - if (c->querybuf[0] == '*') { + if (c->querybuf[c->qb_pos] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; } else { c->reqtype = PROTO_REQ_INLINE; @@ -1400,7 +1400,7 @@ void processInputBuffer(client *c) { if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf); + c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; } /* Don't reset the client structure for clients blocked in a @@ -1416,6 +1416,13 @@ void processInputBuffer(client *c) { if (server.current_client == NULL) break; } } + + /* Trim to pos */ + if (c->qb_pos) { + sdsrange(c->querybuf,c->qb_pos,-1); + c->qb_pos = 0; + } + server.current_client = NULL; } diff --git a/src/server.h b/src/server.h index 186d08250..ce127b587 100644 --- a/src/server.h +++ b/src/server.h @@ -710,6 +710,7 @@ typedef struct client { redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ + size_t qb_pos; /* The position we have read in querybuf. */ sds pending_querybuf; /* If this client is flagged as master, this buffer represents the yet not applied portion of the replication stream that we are receiving from -- cgit v1.2.1