diff options
Diffstat (limited to 'src/networking.c')
-rw-r--r-- | src/networking.c | 113 |
1 files changed, 74 insertions, 39 deletions
diff --git a/src/networking.c b/src/networking.c index b05d02b1b..767d871d8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -147,7 +147,6 @@ client *createClient(connection *conn) { c->ref_block_pos = 0; c->qb_pos = 0; c->querybuf = sdsempty(); - c->pending_querybuf = sdsempty(); c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -167,6 +166,7 @@ client *createClient(connection *conn) { c->repl_start_cmd_stream_on_ack = 0; c->reploff = 0; c->read_reploff = 0; + c->repl_applied = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->repl_last_partial_write = 0; @@ -201,7 +201,7 @@ client *createClient(connection *conn) { c->pending_read_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; - c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0; + c->last_memory_usage = 0; c->last_memory_type = CLIENT_TYPE_NORMAL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; @@ -1568,7 +1568,6 @@ void freeClient(client *c) { /* Free the query buffer */ sdsfree(c->querybuf); - sdsfree(c->pending_querybuf); c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ @@ -1940,7 +1939,7 @@ int writeToClient(client *c, int handler_installed) { if (!clientHasPendingReplies(c)) { c->sentlen = 0; /* Note that writeToClient() is called in a threaded way, but - * adDeleteFileEvent() is not thread safe: however writeToClient() + * aeDeleteFileEvent() is not thread safe: however writeToClient() * is always called with handler_installed set to 0 from threads * so we are fine. */ if (handler_installed) { @@ -1954,7 +1953,11 @@ int writeToClient(client *c, int handler_installed) { return C_ERR; } } - updateClientMemUsage(c); + /* Update client's memory usage after writing. + * Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in + * handleClientsWithPendingWritesUsingThreads(). */ + if (io_threads_op == IO_THREADS_OP_IDLE) + updateClientMemUsage(c); return C_OK; } @@ -2128,7 +2131,7 @@ int processInlineBuffer(client *c) { * we got some desynchronization in the protocol, for example * because of a PSYNC gone bad. * - * However the is an exception: masters may send us just a newline + * However there is an exception: masters may send us just a newline * to keep the connection active. */ if (querylen != 0 && c->flags & CLIENT_MASTER) { sdsfreesplitres(argv,argc); @@ -2292,8 +2295,12 @@ int processMultibulkBuffer(client *c) { } c->qb_pos = newline-c->querybuf+2; - if (ll >= PROTO_MBULK_BIG_ARG) { - /* If we are going to read a large object from network + if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) { + /* When the client is not a master client (because master + * client's querybuf can only be trimmed after data applied + * and sent to replicas). + * + * If we are going to read a large object from network * try to make it likely that it will start at c->querybuf * boundary so that we can optimize object creation * avoiding a large copy of data. @@ -2324,10 +2331,11 @@ int processMultibulkBuffer(client *c) { c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len); } - /* Optimization: if the buffer contains JUST our bulk element + /* Optimization: if a non-master client's buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ - if (c->qb_pos == 0 && + if (!(c->flags & CLIENT_MASTER) && + c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { @@ -2388,8 +2396,8 @@ void commandProcessed(client *c) { if (c->flags & CLIENT_MASTER) { long long applied = c->reploff - prev_offset; if (applied) { - replicationFeedStreamFromMasterStream(c->pending_querybuf,applied); - sdsrange(c->pending_querybuf,applied,-1); + replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied); + c->repl_applied += applied; } } } @@ -2432,13 +2440,22 @@ int processCommandAndResetClient(client *c) { /* This function will execute any fully parsed commands pending on * the client. Returns C_ERR if the client is no longer valid after executing * the command, and C_OK for all other cases. */ -int processPendingCommandsAndResetClient(client *c) { +int processPendingCommandAndInputBuffer(client *c) { if (c->flags & CLIENT_PENDING_COMMAND) { c->flags &= ~CLIENT_PENDING_COMMAND; if (processCommandAndResetClient(c) == C_ERR) { return C_ERR; } } + + /* Now process client if it has more data in it's buffer. + * + * Note: when a master client steps into this function, + * it can always satisfy this condition, because its querbuf + * contains data not applied. */ + if (c->querybuf && sdslen(c->querybuf) > 0) { + return processInputBuffer(c); + } return C_OK; } @@ -2510,8 +2527,26 @@ int processInputBuffer(client *c) { } } - /* Trim to pos */ - if (c->qb_pos) { + if (c->flags & CLIENT_MASTER) { + /* If the client is a master, trim the querybuf to repl_applied, + * since master client is very special, its querybuf not only + * used to parse command, but also proxy to sub-replicas. + * + * Here are some scenarios we cannot trim to qb_pos: + * 1. we don't receive complete command from master + * 2. master client blocked cause of client pause + * 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND + * + * In these scenarios, qb_pos points to the part of the current command + * or the beginning of next command, and the current command is not applied yet, + * so the repl_applied is not equal to qb_pos. */ + if (c->repl_applied) { + sdsrange(c->querybuf,c->repl_applied,-1); + c->qb_pos -= c->repl_applied; + c->repl_applied = 0; + } + } else if (c->qb_pos) { + /* Trim to pos */ sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } @@ -2519,7 +2554,8 @@ int processInputBuffer(client *c) { /* Update client memory usage after processing the query buffer, this is * important in case the query buffer is big and wasn't drained during * the above loop (because of partially sent big commands). */ - updateClientMemUsage(c); + if (io_threads_op == IO_THREADS_OP_IDLE) + updateClientMemUsage(c); return C_OK; } @@ -2546,16 +2582,22 @@ void readQueryFromClient(connection *conn) { if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { - ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); + ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); big_arg = 1; /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0) readlen = remaining; + + /* Master client needs expand the readlen when meet BIG_ARG(see #9100), + * but doesn't need align to the next arg, we can read more data. */ + if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) + readlen = PROTO_IOBUF_LEN; } qblen = sdslen(c->querybuf); - if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) { + if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. + (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg * into the query buffer, so we don't need to pre-allocate more than we * need, so using the non-greedy growing. For an initial allocation of @@ -2585,12 +2627,6 @@ void readQueryFromClient(connection *conn) { } freeClientAsync(c); goto done; - } else if (c->flags & CLIENT_MASTER) { - /* Append the query buffer to the pending (not applied) buffer - * of the master. We'll use this buffer later in order to have a - * copy of the string applied by the last command executed. */ - c->pending_querybuf = sdscatlen(c->pending_querybuf, - c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); @@ -2868,8 +2904,6 @@ void clientCommand(client *c) { " Control the replies sent to the current connection.", "SETNAME <name>", " Assign the name <name> to the current connection.", -"GETNAME", -" Get the name of the current connection.", "UNBLOCK <clientid> [TIMEOUT|ERROR]", " Unblock the specified blocked client.", "TRACKING (ON|OFF) [REDIRECT <id>] [BCAST] [PREFIX <prefix> [...]]", @@ -3075,6 +3109,10 @@ NULL if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL) != C_OK) return; struct client *target = lookupClientByID(id); + /* Note that we never try to unblock a client blocked on a module command, which + * doesn't have a timeout callback (even in the case of UNBLOCK ERROR). + * The reason is that we assume that if a command doesn't expect to be timedout, + * it also doesn't expect to be unblocked by CLIENT UNBLOCK */ if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) { if (unblock_error) addReplyError(target, @@ -3464,7 +3502,11 @@ static void retainOriginalCommandVector(client *c) { * original_argv array. */ void redactClientCommandArgument(client *c, int argc) { retainOriginalCommandVector(c); - decrRefCount(c->argv[argc]); + if (c->original_argv[argc] == shared.redacted) { + /* This argument has already been redacted */ + return; + } + decrRefCount(c->original_argv[argc]); c->original_argv[argc] = shared.redacted; } @@ -4165,8 +4207,8 @@ int handleClientsWithPendingWritesUsingThreads(void) { while((ln = listNext(&li))) { client *c = listNodeValue(ln); - /* Update the client in the mem usage buckets after we're done processing it in the io-threads */ - updateClientMemUsageBucket(c); + /* Update the client in the mem usage after we're done processing it in the io-threads */ + updateClientMemUsage(c); /* Install the write handler if there are pending writes in some * of the clients. */ @@ -4274,17 +4316,10 @@ int handleClientsWithPendingReadsUsingThreads(void) { continue; } - /* Once io-threads are idle we can update the client in the mem usage buckets */ - updateClientMemUsageBucket(c); - - if (processPendingCommandsAndResetClient(c) == C_ERR) { - /* If the client is no longer valid, we avoid - * processing the client later. So we just go - * to the next. */ - continue; - } + /* Once io-threads are idle we can update the client in the mem usage */ + updateClientMemUsage(c); - if (processInputBuffer(c) == C_ERR) { + if (processPendingCommandAndInputBuffer(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */ |