diff options
Diffstat (limited to 'src/networking.c')
-rw-r--r-- | src/networking.c | 217 |
1 files changed, 151 insertions, 66 deletions
diff --git a/src/networking.c b/src/networking.c index d514c11a5..9bb12787b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -140,6 +140,8 @@ client *createClient(connection *conn) { c->name = NULL; c->bufpos = 0; c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf); + c->ref_repl_buf_node = NULL; + c->ref_block_pos = 0; c->qb_pos = 0; c->querybuf = sdsempty(); c->pending_querybuf = sdsempty(); @@ -467,7 +469,7 @@ void afterErrorReply(client *c, const char *s, size_t len) { "to its %s: '%.*s' after processing the command " "'%s'", from, to, (int)len, s, cmdname); if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog && - server.repl_backlog_histlen > 0) + server.repl_backlog->histlen > 0) { showLatestBacklog(); } @@ -985,30 +987,37 @@ void AddReplyFromClient(client *dst, client *src) { closeClientOnOutputBufferLimitReached(dst, 1); } -/* Copy 'src' client output buffers into 'dst' client output buffers. - * The function takes care of freeing the old output buffers of the - * destination client. */ -void copyClientOutputBuffer(client *dst, client *src) { - listEmpty(dst->reply); - dst->sentlen = 0; - dst->bufpos = 0; - dst->reply_bytes = 0; +/* Logically copy 'src' replica client buffers info to 'dst' replica. + * Basically increase referenced buffer block node reference count. */ +void copyReplicaOutputBuffer(client *dst, client *src) { + serverAssert(src->bufpos == 0 && listLength(src->reply) == 0); - /* First copy src static buffer into dst (either static buffer or reply - * list, maybe clients have different 'usable_buffer_size'). */ - _addReplyToBufferOrList(dst,src->buf,src->bufpos); - - /* Copy src reply list into the dest. */ - list* reply = listDup(src->reply); - listJoin(dst->reply,reply); - dst->reply_bytes += src->reply_bytes; - listRelease(reply); + if (src->ref_repl_buf_node == NULL) return; + dst->ref_repl_buf_node = src->ref_repl_buf_node; + dst->ref_block_pos = src->ref_block_pos; + ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++; } /* Return true if the specified client has pending reply buffers to write to * the socket. */ int clientHasPendingReplies(client *c) { - return c->bufpos || listLength(c->reply); + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + /* Replicas use global shared replication buffer instead of + * private output buffer. */ + serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); + if (c->ref_repl_buf_node == NULL) return 0; + + /* If the last replication buffer block content is totally sent, + * we have nothing to send. */ + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = listNodeValue(ln); + if (ln == c->ref_repl_buf_node && + c->ref_block_pos == tail->used) return 0; + + return 1; + } else { + return c->bufpos || listLength(c->reply); + } } void clientAcceptHandler(connection *conn) { @@ -1395,6 +1404,7 @@ void freeClient(client *c) { /* Free data structures. */ listRelease(c->reply); + freeReplicaReferencedReplBuffer(c); freeClientArgv(c); freeClientOriginalArgv(c); @@ -1542,6 +1552,77 @@ client *lookupClientByID(uint64_t id) { return (c == raxNotFound) ? NULL : c; } +/* This function does actual writing output buffers to different types of + * clients, it is called by writeToClient. + * If we write successfully, it return C_OK, otherwise, C_ERR is returned, + * And 'nwritten' is a output parameter, it means how many bytes server write + * to client. */ +int _writeToClient(client *c, ssize_t *nwritten) { + *nwritten = 0; + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); + + replBufBlock *o = listNodeValue(c->ref_repl_buf_node); + serverAssert(o->used >= c->ref_block_pos); + /* Send current block if it is not fully sent. */ + if (o->used > c->ref_block_pos) { + *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos, + o->used-c->ref_block_pos); + if (*nwritten <= 0) return C_ERR; + c->ref_block_pos += *nwritten; + } + + /* If we fully sent the object on head, go to the next one. */ + listNode *next = listNextNode(c->ref_repl_buf_node); + if (next && c->ref_block_pos == o->used) { + o->refcount--; + ((replBufBlock *)(listNodeValue(next)))->refcount++; + c->ref_repl_buf_node = next; + c->ref_block_pos = 0; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } + return C_OK; + } + + if (c->bufpos > 0) { + *nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (*nwritten <= 0) return C_ERR; + c->sentlen += *nwritten; + + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if ((int)c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + } else { + clientReplyBlock *o = listNodeValue(listFirst(c->reply)); + size_t objlen = o->used; + + if (objlen == 0) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); + return C_OK; + } + + *nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); + if (*nwritten <= 0) return C_ERR; + c->sentlen += *nwritten; + + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == objlen) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + /* If there are no longer objects in the list, we expect + * the count of reply bytes to be exactly zero. */ + if (listLength(c->reply) == 0) + serverAssert(c->reply_bytes == 0); + } + } + return C_OK; +} + /* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed because of some * error. If handler_installed is set, it will attempt to clear the @@ -1555,48 +1636,11 @@ int writeToClient(client *c, int handler_installed) { atomicIncr(server.stat_total_writes_processed, 1); ssize_t nwritten = 0, totwritten = 0; - size_t objlen; - clientReplyBlock *o; while(clientHasPendingReplies(c)) { - if (c->bufpos > 0) { - nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ - if ((int)c->sentlen == c->bufpos) { - c->bufpos = 0; - c->sentlen = 0; - } - } else { - o = listNodeValue(listFirst(c->reply)); - objlen = o->used; - - if (objlen == 0) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - continue; - } - - nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; - /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ - if (listLength(c->reply) == 0) - serverAssert(c->reply_bytes == 0); - } - } + int ret = _writeToClient(c, &nwritten); + if (ret == C_ERR) break; + totwritten += nwritten; /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT * bytes, in a single threaded server it's a good idea to serve * other clients as well, even if a very large request comes from @@ -2077,8 +2121,7 @@ void commandProcessed(client *c) { if (c->flags & CLIENT_MASTER) { long long applied = c->reploff - prev_offset; if (applied) { - replicationFeedSlavesFromMasterStream(server.slaves, - c->pending_querybuf, applied); + replicationFeedStreamFromMasterStream(c->pending_querybuf,applied); sdsrange(c->pending_querybuf,applied,-1); } } @@ -2399,6 +2442,13 @@ sds catClientInfoString(sds s, client *client) { /* Compute the total memory consumed by this client. */ size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); + size_t used_blocks_of_repl_buf = 0; + if (client->ref_repl_buf_node) { + replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks)); + replBufBlock *cur = listNodeValue(client->ref_repl_buf_node); + used_blocks_of_repl_buf = last->id - cur->id + 1; + } + sds cmdname = client->lastcmd ? getFullCommandName(client->lastcmd) : NULL; sds ret = sdscatfmt(s, "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", @@ -2419,7 +2469,7 @@ sds catClientInfoString(sds s, client *client) { (unsigned long long) client->argv_len_sum, (unsigned long long) client->mstate.argv_len_sums, (unsigned long long) client->bufpos, - (unsigned long long) listLength(client->reply), + (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf, (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ (unsigned long long) total_mem, events, @@ -3247,8 +3297,21 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ size_t getClientOutputBufferMemoryUsage(client *c) { - size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); - return c->reply_bytes + (list_item_size*listLength(c->reply)); + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + size_t repl_buf_size = 0; + size_t repl_node_num = 0; + size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock); + if (c->ref_repl_buf_node) { + replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks)); + replBufBlock *cur = listNodeValue(c->ref_repl_buf_node); + repl_buf_size = last->repl_offset + last->size - cur->repl_offset; + repl_node_num = last->id - cur->id + 1; + } + return repl_buf_size + (repl_node_size*repl_node_num); + } else { + size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); + return c->reply_bytes + (list_item_size*listLength(c->reply)); + } } /* Returns the total client's memory usage. @@ -3332,8 +3395,18 @@ int checkClientOutputBufferLimits(client *c) { * like normal clients. */ if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL; + /* Note that it doesn't make sense to set the replica clients output buffer + * limit lower than the repl-backlog-size config (partial sync will succeed + * and then replica will get disconnected). + * Such a configuration is ignored (the size of repl-backlog-size will be used). + * This doesn't have memory consumption implications since the replica client + * will share the backlog buffers memory. */ + size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes; + if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes && + (long long)hard_limit_bytes < server.repl_backlog_size) + hard_limit_bytes = server.repl_backlog_size; if (server.client_obuf_limits[class].hard_limit_bytes && - used_mem >= server.client_obuf_limits[class].hard_limit_bytes) + used_mem >= hard_limit_bytes) hard = 1; if (server.client_obuf_limits[class].soft_limit_bytes && used_mem >= server.client_obuf_limits[class].soft_limit_bytes) @@ -3375,7 +3448,10 @@ int checkClientOutputBufferLimits(client *c) { int closeClientOnOutputBufferLimitReached(client *c, int async) { if (!c->conn) return 0; /* It is unsafe to free fake clients. */ serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); - if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return 0; + /* Note that c->reply_bytes is irrelevant for replica clients + * (they use the global repl buffers). */ + if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) || + c->flags & CLIENT_CLOSE_ASAP) return 0; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); @@ -3740,6 +3816,15 @@ int handleClientsWithPendingWritesUsingThreads(void) { continue; } + /* Since all replicas and replication backlog use global replication + * buffer, to guarantee data accessing thread safe, we must put all + * replicas client into io_threads_list[0] i.e. main thread handles + * sending the output buffer of all replicas. */ + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + listAddNodeTail(io_threads_list[0],c); + continue; + } + int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; |