summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c113
1 files changed, 74 insertions, 39 deletions
diff --git a/src/networking.c b/src/networking.c
index b05d02b1b..767d871d8 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -147,7 +147,6 @@ client *createClient(connection *conn) {
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
- c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
@@ -167,6 +166,7 @@ client *createClient(connection *conn) {
c->repl_start_cmd_stream_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
+ c->repl_applied = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->repl_last_partial_write = 0;
@@ -201,7 +201,7 @@ client *createClient(connection *conn) {
c->pending_read_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
- c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0;
+ c->last_memory_usage = 0;
c->last_memory_type = CLIENT_TYPE_NORMAL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
@@ -1568,7 +1568,6 @@ void freeClient(client *c) {
/* Free the query buffer */
sdsfree(c->querybuf);
- sdsfree(c->pending_querybuf);
c->querybuf = NULL;
/* Deallocate structures used to block on blocking ops. */
@@ -1940,7 +1939,7 @@ int writeToClient(client *c, int handler_installed) {
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Note that writeToClient() is called in a threaded way, but
- * adDeleteFileEvent() is not thread safe: however writeToClient()
+ * aeDeleteFileEvent() is not thread safe: however writeToClient()
* is always called with handler_installed set to 0 from threads
* so we are fine. */
if (handler_installed) {
@@ -1954,7 +1953,11 @@ int writeToClient(client *c, int handler_installed) {
return C_ERR;
}
}
- updateClientMemUsage(c);
+ /* Update client's memory usage after writing.
+ * Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
+ * handleClientsWithPendingWritesUsingThreads(). */
+ if (io_threads_op == IO_THREADS_OP_IDLE)
+ updateClientMemUsage(c);
return C_OK;
}
@@ -2128,7 +2131,7 @@ int processInlineBuffer(client *c) {
* we got some desynchronization in the protocol, for example
* because of a PSYNC gone bad.
*
- * However the is an exception: masters may send us just a newline
+ * However there is an exception: masters may send us just a newline
* to keep the connection active. */
if (querylen != 0 && c->flags & CLIENT_MASTER) {
sdsfreesplitres(argv,argc);
@@ -2292,8 +2295,12 @@ int processMultibulkBuffer(client *c) {
}
c->qb_pos = newline-c->querybuf+2;
- if (ll >= PROTO_MBULK_BIG_ARG) {
- /* If we are going to read a large object from network
+ if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) {
+ /* When the client is not a master client (because master
+ * client's querybuf can only be trimmed after data applied
+ * and sent to replicas).
+ *
+ * If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data.
@@ -2324,10 +2331,11 @@ int processMultibulkBuffer(client *c) {
c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len);
}
- /* Optimization: if the buffer contains JUST our bulk element
+ /* Optimization: if a non-master client's buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
- if (c->qb_pos == 0 &&
+ if (!(c->flags & CLIENT_MASTER) &&
+ c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{
@@ -2388,8 +2396,8 @@ void commandProcessed(client *c) {
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
if (applied) {
- replicationFeedStreamFromMasterStream(c->pending_querybuf,applied);
- sdsrange(c->pending_querybuf,applied,-1);
+ replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied);
+ c->repl_applied += applied;
}
}
}
@@ -2432,13 +2440,22 @@ int processCommandAndResetClient(client *c) {
/* This function will execute any fully parsed commands pending on
* the client. Returns C_ERR if the client is no longer valid after executing
* the command, and C_OK for all other cases. */
-int processPendingCommandsAndResetClient(client *c) {
+int processPendingCommandAndInputBuffer(client *c) {
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
}
+
+ /* Now process client if it has more data in it's buffer.
+ *
+ * Note: when a master client steps into this function,
+ * it can always satisfy this condition, because its querbuf
+ * contains data not applied. */
+ if (c->querybuf && sdslen(c->querybuf) > 0) {
+ return processInputBuffer(c);
+ }
return C_OK;
}
@@ -2510,8 +2527,26 @@ int processInputBuffer(client *c) {
}
}
- /* Trim to pos */
- if (c->qb_pos) {
+ if (c->flags & CLIENT_MASTER) {
+ /* If the client is a master, trim the querybuf to repl_applied,
+ * since master client is very special, its querybuf not only
+ * used to parse command, but also proxy to sub-replicas.
+ *
+ * Here are some scenarios we cannot trim to qb_pos:
+ * 1. we don't receive complete command from master
+ * 2. master client blocked cause of client pause
+ * 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND
+ *
+ * In these scenarios, qb_pos points to the part of the current command
+ * or the beginning of next command, and the current command is not applied yet,
+ * so the repl_applied is not equal to qb_pos. */
+ if (c->repl_applied) {
+ sdsrange(c->querybuf,c->repl_applied,-1);
+ c->qb_pos -= c->repl_applied;
+ c->repl_applied = 0;
+ }
+ } else if (c->qb_pos) {
+ /* Trim to pos */
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
@@ -2519,7 +2554,8 @@ int processInputBuffer(client *c) {
/* Update client memory usage after processing the query buffer, this is
* important in case the query buffer is big and wasn't drained during
* the above loop (because of partially sent big commands). */
- updateClientMemUsage(c);
+ if (io_threads_op == IO_THREADS_OP_IDLE)
+ updateClientMemUsage(c);
return C_OK;
}
@@ -2546,16 +2582,22 @@ void readQueryFromClient(connection *conn) {
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
- ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
+ ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
big_arg = 1;
/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0) readlen = remaining;
+
+ /* Master client needs expand the readlen when meet BIG_ARG(see #9100),
+ * but doesn't need align to the next arg, we can read more data. */
+ if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
+ readlen = PROTO_IOBUF_LEN;
}
qblen = sdslen(c->querybuf);
- if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) {
+ if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
+ (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
* into the query buffer, so we don't need to pre-allocate more than we
* need, so using the non-greedy growing. For an initial allocation of
@@ -2585,12 +2627,6 @@ void readQueryFromClient(connection *conn) {
}
freeClientAsync(c);
goto done;
- } else if (c->flags & CLIENT_MASTER) {
- /* Append the query buffer to the pending (not applied) buffer
- * of the master. We'll use this buffer later in order to have a
- * copy of the string applied by the last command executed. */
- c->pending_querybuf = sdscatlen(c->pending_querybuf,
- c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
@@ -2868,8 +2904,6 @@ void clientCommand(client *c) {
" Control the replies sent to the current connection.",
"SETNAME <name>",
" Assign the name <name> to the current connection.",
-"GETNAME",
-" Get the name of the current connection.",
"UNBLOCK <clientid> [TIMEOUT|ERROR]",
" Unblock the specified blocked client.",
"TRACKING (ON|OFF) [REDIRECT <id>] [BCAST] [PREFIX <prefix> [...]]",
@@ -3075,6 +3109,10 @@ NULL
if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
!= C_OK) return;
struct client *target = lookupClientByID(id);
+ /* Note that we never try to unblock a client blocked on a module command, which
+ * doesn't have a timeout callback (even in the case of UNBLOCK ERROR).
+ * The reason is that we assume that if a command doesn't expect to be timedout,
+ * it also doesn't expect to be unblocked by CLIENT UNBLOCK */
if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
if (unblock_error)
addReplyError(target,
@@ -3464,7 +3502,11 @@ static void retainOriginalCommandVector(client *c) {
* original_argv array. */
void redactClientCommandArgument(client *c, int argc) {
retainOriginalCommandVector(c);
- decrRefCount(c->argv[argc]);
+ if (c->original_argv[argc] == shared.redacted) {
+ /* This argument has already been redacted */
+ return;
+ }
+ decrRefCount(c->original_argv[argc]);
c->original_argv[argc] = shared.redacted;
}
@@ -4165,8 +4207,8 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
- /* Update the client in the mem usage buckets after we're done processing it in the io-threads */
- updateClientMemUsageBucket(c);
+ /* Update the client in the mem usage after we're done processing it in the io-threads */
+ updateClientMemUsage(c);
/* Install the write handler if there are pending writes in some
* of the clients. */
@@ -4274,17 +4316,10 @@ int handleClientsWithPendingReadsUsingThreads(void) {
continue;
}
- /* Once io-threads are idle we can update the client in the mem usage buckets */
- updateClientMemUsageBucket(c);
-
- if (processPendingCommandsAndResetClient(c) == C_ERR) {
- /* If the client is no longer valid, we avoid
- * processing the client later. So we just go
- * to the next. */
- continue;
- }
+ /* Once io-threads are idle we can update the client in the mem usage */
+ updateClientMemUsage(c);
- if (processInputBuffer(c) == C_ERR) {
+ if (processPendingCommandAndInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */