summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2018-08-26 16:30:49 +0200
committerGitHub <noreply@github.com>2018-08-26 16:30:49 +0200
commit80e16956529ba0999cfe30a171693c8f3d3a4098 (patch)
tree637c7ed7bb301b5a3ccb966f125bcfde8dd29a80
parent46d89a9abb5550b8b425c179e3c92d10e64f41fe (diff)
parentf2ad89a314a0be2ea50a598339ee903ec3f64b65 (diff)
downloadredis-80e16956529ba0999cfe30a171693c8f3d3a4098.tar.gz
Merge pull request #5244 from soloestoy/optimize-pipeline
pipeline: do not sdsrange querybuf unless all commands processed
-rw-r--r--src/networking.c107
-rw-r--r--src/server.h1
-rw-r--r--tests/unit/introspection.tcl2
3 files changed, 56 insertions, 54 deletions
diff --git a/src/networking.c b/src/networking.c
index af7422178..58248ced0 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -33,7 +33,7 @@
#include <math.h>
#include <ctype.h>
-static void setProtocolError(const char *errstr, client *c, long pos);
+static void setProtocolError(const char *errstr, client *c);
/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
@@ -110,6 +110,7 @@ client *createClient(int fd) {
c->fd = fd;
c->name = NULL;
c->bufpos = 0;
+ c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
@@ -1119,29 +1120,29 @@ int processInlineBuffer(client *c) {
size_t querylen;
/* Search for end of line */
- newline = strchr(c->querybuf,'\n');
+ newline = strchr(c->querybuf+c->qb_pos,'\n');
/* Nothing to do without a \r\n */
if (newline == NULL) {
- if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
+ if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big inline request");
- setProtocolError("too big inline request",c,0);
+ setProtocolError("too big inline request",c);
}
return C_ERR;
}
/* Handle the \r\n case. */
- if (newline && newline != c->querybuf && *(newline-1) == '\r')
+ if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
newline--, linefeed_chars++;
/* Split the input buffer up to the \r\n */
- querylen = newline-(c->querybuf);
- aux = sdsnewlen(c->querybuf,querylen);
+ querylen = newline-(c->querybuf+c->qb_pos);
+ aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
argv = sdssplitargs(aux,&argc);
sdsfree(aux);
if (argv == NULL) {
addReplyError(c,"Protocol error: unbalanced quotes in request");
- setProtocolError("unbalanced quotes in inline request",c,0);
+ setProtocolError("unbalanced quotes in inline request",c);
return C_ERR;
}
@@ -1151,8 +1152,8 @@ int processInlineBuffer(client *c) {
if (querylen == 0 && c->flags & CLIENT_SLAVE)
c->repl_ack_time = server.unixtime;
- /* Leave data after the first line of the query in the buffer */
- sdsrange(c->querybuf,querylen+linefeed_chars,-1);
+ /* Move querybuffer position to the next query in the buffer. */
+ c->qb_pos += querylen+linefeed_chars;
/* Setup argv array on client structure */
if (argc) {
@@ -1173,19 +1174,19 @@ int processInlineBuffer(client *c) {
return C_OK;
}
-/* Helper function. Trims query buffer to make the function that processes
- * multi bulk requests idempotent. */
+/* Helper function. Record protocol erro details in server log,
+ * and set the client as CLIENT_CLOSE_AFTER_REPLY. */
#define PROTO_DUMP_LEN 128
-static void setProtocolError(const char *errstr, client *c, long pos) {
+static void setProtocolError(const char *errstr, client *c) {
if (server.verbosity <= LL_VERBOSE) {
sds client = catClientInfoString(sdsempty(),c);
/* Sample some protocol to given an idea about what was inside. */
char buf[256];
- if (sdslen(c->querybuf) < PROTO_DUMP_LEN) {
- snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf);
+ if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
+ snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
} else {
- snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf, sdslen(c->querybuf)-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
+ snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
}
/* Remove non printable chars. */
@@ -1201,7 +1202,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
sdsfree(client);
}
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
- sdsrange(c->querybuf,pos,-1);
}
/* Process the query buffer for client 'c', setting up the client argument
@@ -1217,7 +1217,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
* to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
int processMultibulkBuffer(client *c) {
char *newline = NULL;
- long pos = 0;
int ok;
long long ll;
@@ -1226,34 +1225,32 @@ int processMultibulkBuffer(client *c) {
serverAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */
- newline = strchr(c->querybuf,'\r');
+ newline = strchr(c->querybuf+c->qb_pos,'\r');
if (newline == NULL) {
- if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
+ if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string");
- setProtocolError("too big mbulk count string",c,0);
+ setProtocolError("too big mbulk count string",c);
}
return C_ERR;
}
/* Buffer should also contain \n */
- if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
+ if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
return C_ERR;
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
- serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
- ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
+ serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
+ ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
- setProtocolError("invalid mbulk count",c,pos);
+ setProtocolError("invalid mbulk count",c);
return C_ERR;
}
- pos = (newline-c->querybuf)+2;
- if (ll <= 0) {
- sdsrange(c->querybuf,pos,-1);
- return C_OK;
- }
+ c->qb_pos = (newline-c->querybuf)+2;
+
+ if (ll <= 0) return C_OK;
c->multibulklen = ll;
@@ -1266,37 +1263,37 @@ int processMultibulkBuffer(client *c) {
while(c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
- newline = strchr(c->querybuf+pos,'\r');
+ newline = strchr(c->querybuf+c->qb_pos,'\r');
if (newline == NULL) {
- if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
+ if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,
"Protocol error: too big bulk count string");
- setProtocolError("too big bulk count string",c,0);
+ setProtocolError("too big bulk count string",c);
return C_ERR;
}
break;
}
/* Buffer should also contain \n */
- if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
+ if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
break;
- if (c->querybuf[pos] != '$') {
+ if (c->querybuf[c->qb_pos] != '$') {
addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'",
- c->querybuf[pos]);
- setProtocolError("expected $ but got something else",c,pos);
+ c->querybuf[c->qb_pos]);
+ setProtocolError("expected $ but got something else",c);
return C_ERR;
}
- ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
+ ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
addReplyError(c,"Protocol error: invalid bulk length");
- setProtocolError("invalid bulk length",c,pos);
+ setProtocolError("invalid bulk length",c);
return C_ERR;
}
- pos += newline-(c->querybuf+pos)+2;
+ c->qb_pos = newline-c->querybuf+2;
if (ll >= PROTO_MBULK_BIG_ARG) {
size_t qblen;
@@ -1304,8 +1301,8 @@ int processMultibulkBuffer(client *c) {
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data. */
- sdsrange(c->querybuf,pos,-1);
- pos = 0;
+ sdsrange(c->querybuf,c->qb_pos,-1);
+ c->qb_pos = 0;
qblen = sdslen(c->querybuf);
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
@@ -1316,14 +1313,14 @@ int processMultibulkBuffer(client *c) {
}
/* Read bulk argument */
- if (sdslen(c->querybuf)-pos < (size_t)(c->bulklen+2)) {
+ if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
/* Not enough data (+2 == trailing \r\n) */
break;
} else {
/* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
- if (pos == 0 &&
+ if (c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{
@@ -1333,20 +1330,16 @@ int processMultibulkBuffer(client *c) {
* likely... */
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
sdsclear(c->querybuf);
- pos = 0;
} else {
c->argv[c->argc++] =
- createStringObject(c->querybuf+pos,c->bulklen);
- pos += c->bulklen+2;
+ createStringObject(c->querybuf+c->qb_pos,c->bulklen);
+ c->qb_pos += c->bulklen+2;
}
c->bulklen = -1;
c->multibulklen--;
}
}
- /* Trim to pos */
- if (pos) sdsrange(c->querybuf,pos,-1);
-
/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return C_OK;
@@ -1360,8 +1353,9 @@ int processMultibulkBuffer(client *c) {
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
server.current_client = c;
+
/* Keep processing while there is something in the input buffer */
- while(sdslen(c->querybuf)) {
+ while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
@@ -1377,7 +1371,7 @@ void processInputBuffer(client *c) {
/* Determine request type when unknown. */
if (!c->reqtype) {
- if (c->querybuf[0] == '*') {
+ if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
@@ -1400,7 +1394,7 @@ void processInputBuffer(client *c) {
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
- c->reploff = c->read_reploff - sdslen(c->querybuf);
+ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
/* Don't reset the client structure for clients blocked in a
@@ -1416,6 +1410,13 @@ void processInputBuffer(client *c) {
if (server.current_client == NULL) break;
}
}
+
+ /* Trim to pos */
+ if (c->qb_pos) {
+ sdsrange(c->querybuf,c->qb_pos,-1);
+ c->qb_pos = 0;
+ }
+
server.current_client = NULL;
}
diff --git a/src/server.h b/src/server.h
index 186d08250..ce127b587 100644
--- a/src/server.h
+++ b/src/server.h
@@ -710,6 +710,7 @@ typedef struct client {
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
+ size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl
index f6477d9c5..2581eb83d 100644
--- a/tests/unit/introspection.tcl
+++ b/tests/unit/introspection.tcl
@@ -1,7 +1,7 @@
start_server {tags {"introspection"}} {
test {CLIENT LIST} {
r client list
- } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=* obl=0 oll=0 omem=0 events=r cmd=client*}
+ } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* obl=0 oll=0 omem=0 events=r cmd=client*}
test {MONITOR can log executed commands} {
set rd [redis_deferring_client]