summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhaozhao.zz <276441700@qq.com>2022-03-25 10:45:40 +0800
committerGitHub <noreply@github.com>2022-03-25 10:45:40 +0800
commit78bef6e1fe4b69e9cca6a922911bd88a92584edb (patch)
tree118d28a0dc758f5af8155e6fda4d89e17383cd9b
parent1a57af629c4fa4e6bac35602a5373445ca42753b (diff)
downloadredis-78bef6e1fe4b69e9cca6a922911bd88a92584edb.tar.gz
optimize(remove) usage of client's pending_querybuf (#10413)
To remove `pending_querybuf`, the key point is reusing `querybuf`, it means master client's `querybuf` is not only used to parse command, but also proxy to sub-replicas. 1. add a new variable `repl_applied` for master client to record how many data applied (propagated via `replicationFeedStreamFromMasterStream()`) but not trimmed in `querybuf`. 2. don't sdsrange `querybuf` in `commandProcessed()`, we trim it to `repl_applied` after the whole replication pipeline processed to avoid fragmented `sdsrange`. And here are some scenarios we cannot trim to `qb_pos`: * we don't receive complete command from master * master client blocked because of client pause * IO threads operate read, master client flagged with CLIENT_PENDING_COMMAND In these scenarios, `qb_pos` points to the part of the current command or the beginning of next command, and the current command is not applied yet, so the `repl_applied` is not equal to `qb_pos`. Some other notes: * Do not do big arg optimization on master client, since we can only sdsrange `querybuf` after data sent to replicas. * Set `qb_pos` and `repl_applied` to 0 when `freeClient` in `replicationCacheMaster`. * Rewrite `processPendingCommandsAndResetClient` to `processPendingCommandAndInputBuffer`, let `processInputBuffer` to be called successively after `processCommandAndResetClient`.
-rw-r--r--src/blocked.c7
-rw-r--r--src/networking.c78
-rw-r--r--src/replication.c3
-rw-r--r--src/server.c18
-rw-r--r--src/server.h9
-rw-r--r--tests/support/util.tcl4
6 files changed, 58 insertions, 61 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 2754da9e0..65b584213 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -141,12 +141,7 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
/* If we have a queued command, execute it now. */
- if (processPendingCommandsAndResetClient(c) == C_OK) {
- /* Now process client if it has more data in it's buffer. */
- if (c->querybuf && sdslen(c->querybuf) > 0) {
- if (processInputBuffer(c) == C_ERR) c = NULL;
- }
- } else {
+ if (processPendingCommandAndInputBuffer(c) == C_ERR) {
c = NULL;
}
}
diff --git a/src/networking.c b/src/networking.c
index a96a2e492..767d871d8 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -147,7 +147,6 @@ client *createClient(connection *conn) {
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
- c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
@@ -167,6 +166,7 @@ client *createClient(connection *conn) {
c->repl_start_cmd_stream_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
+ c->repl_applied = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->repl_last_partial_write = 0;
@@ -1568,7 +1568,6 @@ void freeClient(client *c) {
/* Free the query buffer */
sdsfree(c->querybuf);
- sdsfree(c->pending_querybuf);
c->querybuf = NULL;
/* Deallocate structures used to block on blocking ops. */
@@ -2296,8 +2295,12 @@ int processMultibulkBuffer(client *c) {
}
c->qb_pos = newline-c->querybuf+2;
- if (ll >= PROTO_MBULK_BIG_ARG) {
- /* If we are going to read a large object from network
+ if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) {
+ /* When the client is not a master client (because master
+ * client's querybuf can only be trimmed after data applied
+ * and sent to replicas).
+ *
+ * If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data.
@@ -2328,10 +2331,11 @@ int processMultibulkBuffer(client *c) {
c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len);
}
- /* Optimization: if the buffer contains JUST our bulk element
+ /* Optimization: if a non-master client's buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
- if (c->qb_pos == 0 &&
+ if (!(c->flags & CLIENT_MASTER) &&
+ c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{
@@ -2392,8 +2396,8 @@ void commandProcessed(client *c) {
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
if (applied) {
- replicationFeedStreamFromMasterStream(c->pending_querybuf,applied);
- sdsrange(c->pending_querybuf,applied,-1);
+ replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied);
+ c->repl_applied += applied;
}
}
}
@@ -2436,13 +2440,22 @@ int processCommandAndResetClient(client *c) {
/* This function will execute any fully parsed commands pending on
* the client. Returns C_ERR if the client is no longer valid after executing
* the command, and C_OK for all other cases. */
-int processPendingCommandsAndResetClient(client *c) {
+int processPendingCommandAndInputBuffer(client *c) {
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
}
+
+ /* Now process client if it has more data in it's buffer.
+ *
+ * Note: when a master client steps into this function,
+ * it can always satisfy this condition, because its querbuf
+ * contains data not applied. */
+ if (c->querybuf && sdslen(c->querybuf) > 0) {
+ return processInputBuffer(c);
+ }
return C_OK;
}
@@ -2514,8 +2527,26 @@ int processInputBuffer(client *c) {
}
}
- /* Trim to pos */
- if (c->qb_pos) {
+ if (c->flags & CLIENT_MASTER) {
+ /* If the client is a master, trim the querybuf to repl_applied,
+ * since master client is very special, its querybuf not only
+ * used to parse command, but also proxy to sub-replicas.
+ *
+ * Here are some scenarios we cannot trim to qb_pos:
+ * 1. we don't receive complete command from master
+ * 2. master client blocked cause of client pause
+ * 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND
+ *
+ * In these scenarios, qb_pos points to the part of the current command
+ * or the beginning of next command, and the current command is not applied yet,
+ * so the repl_applied is not equal to qb_pos. */
+ if (c->repl_applied) {
+ sdsrange(c->querybuf,c->repl_applied,-1);
+ c->qb_pos -= c->repl_applied;
+ c->repl_applied = 0;
+ }
+ } else if (c->qb_pos) {
+ /* Trim to pos */
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
@@ -2551,16 +2582,22 @@ void readQueryFromClient(connection *conn) {
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
- ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
+ ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
big_arg = 1;
/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0) readlen = remaining;
+
+ /* Master client needs expand the readlen when meet BIG_ARG(see #9100),
+ * but doesn't need align to the next arg, we can read more data. */
+ if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
+ readlen = PROTO_IOBUF_LEN;
}
qblen = sdslen(c->querybuf);
- if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) {
+ if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
+ (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
* into the query buffer, so we don't need to pre-allocate more than we
* need, so using the non-greedy growing. For an initial allocation of
@@ -2590,12 +2627,6 @@ void readQueryFromClient(connection *conn) {
}
freeClientAsync(c);
goto done;
- } else if (c->flags & CLIENT_MASTER) {
- /* Append the query buffer to the pending (not applied) buffer
- * of the master. We'll use this buffer later in order to have a
- * copy of the string applied by the last command executed. */
- c->pending_querybuf = sdscatlen(c->pending_querybuf,
- c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
@@ -4288,14 +4319,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
/* Once io-threads are idle we can update the client in the mem usage */
updateClientMemUsage(c);
- if (processPendingCommandsAndResetClient(c) == C_ERR) {
- /* If the client is no longer valid, we avoid
- * processing the client later. So we just go
- * to the next. */
- continue;
- }
-
- if (processInputBuffer(c) == C_ERR) {
+ if (processPendingCommandAndInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
diff --git a/src/replication.c b/src/replication.c
index b93c512fc..e9a754ab4 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -3197,7 +3197,8 @@ void replicationCacheMaster(client *c) {
* offsets, including pending transactions, already populated arguments,
* pending outputs to the master. */
sdsclear(server.master->querybuf);
- sdsclear(server.master->pending_querybuf);
+ server.master->qb_pos = 0;
+ server.master->repl_applied = 0;
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
diff --git a/src/server.c b/src/server.c
index 531709d8a..84f21fed3 100644
--- a/src/server.c
+++ b/src/server.c
@@ -710,23 +710,6 @@ int clientsCronResizeQueryBuffer(client *c) {
* which ever is bigger. */
if (c->bulklen != -1 && (size_t)c->bulklen > c->querybuf_peak)
c->querybuf_peak = c->bulklen;
-
- /* Clients representing masters also use a "pending query buffer" that
- * is the yet not applied part of the stream we are reading. Such buffer
- * also needs resizing from time to time, otherwise after a very large
- * transfer (a huge value or a big MIGRATE operation) it will keep using
- * a lot of memory. */
- if (c->flags & CLIENT_MASTER) {
- /* There are two conditions to resize the pending query buffer:
- * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF.
- * 2) Used length is smaller than pending_querybuf_size/2 */
- size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf);
- if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF &&
- sdslen(c->pending_querybuf) < (pending_querybuf_size/2))
- {
- c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf);
- }
- }
return 0;
}
@@ -6418,7 +6401,6 @@ void dismissClientMemory(client *c) {
/* Dismiss client query buffer and static reply buffer. */
dismissMemory(c->buf, c->buf_usable_size);
dismissSds(c->querybuf);
- dismissSds(c->pending_querybuf);
/* Dismiss argv array only if we estimate it contains a big buffer. */
if (c->argc && c->argv_len_sum/c->argc >= server.page_size) {
for (int i = 0; i < c->argc; i++) {
diff --git a/src/server.h b/src/server.h
index 5f5417de1..ba072e7d3 100644
--- a/src/server.h
+++ b/src/server.h
@@ -166,8 +166,6 @@ typedef long long ustime_t; /* microsecond time type. */
#define PROTO_REPLY_MIN_BYTES (1024) /* the lower limit on reply buffer size */
#define REDIS_AUTOSYNC_BYTES (1024*1024*4) /* Sync file every 4MB. */
-#define LIMIT_PENDING_QUERYBUF (4*1024*1024) /* 4mb */
-
#define REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME 5000 /* 5 seconds */
/* When configuring the server eventloop, we setup it so that the total number
@@ -1082,10 +1080,6 @@ typedef struct client {
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
- sds pending_querybuf; /* If this client is flagged as master, this buffer
- represents the yet not applied portion of the
- replication stream that we are receiving from
- the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
@@ -1122,6 +1116,7 @@ typedef struct client {
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
+ long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
@@ -2848,7 +2843,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
size_t freeMemoryGetNotCountedMemory();
int overMaxmemoryAfterAlloc(size_t moremem);
int processCommand(client *c);
-int processPendingCommandsAndResetClient(client *c);
+int processPendingCommandAndInputBuffer(client *c);
void setupSignalHandlers(void);
void removeSignalHandlers(void);
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler);
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index 46c9654c8..4ad96ab10 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -122,7 +122,7 @@ proc wait_replica_online r {
wait_for_condition 50 100 {
[string match "*slave0:*,state=online*" [$r info replication]]
} else {
- fail "replica didn't sync in time"
+ fail "replica didn't online in time"
}
}
@@ -130,7 +130,7 @@ proc wait_for_ofs_sync {r1 r2} {
wait_for_condition 50 100 {
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]
} else {
- fail "replica didn't sync in time"
+ fail "replica offset didn't match in time"
}
}