summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c217
1 files changed, 151 insertions, 66 deletions
diff --git a/src/networking.c b/src/networking.c
index d514c11a5..9bb12787b 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -140,6 +140,8 @@ client *createClient(connection *conn) {
c->name = NULL;
c->bufpos = 0;
c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
+ c->ref_repl_buf_node = NULL;
+ c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
@@ -467,7 +469,7 @@ void afterErrorReply(client *c, const char *s, size_t len) {
"to its %s: '%.*s' after processing the command "
"'%s'", from, to, (int)len, s, cmdname);
if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
- server.repl_backlog_histlen > 0)
+ server.repl_backlog->histlen > 0)
{
showLatestBacklog();
}
@@ -985,30 +987,37 @@ void AddReplyFromClient(client *dst, client *src) {
closeClientOnOutputBufferLimitReached(dst, 1);
}
-/* Copy 'src' client output buffers into 'dst' client output buffers.
- * The function takes care of freeing the old output buffers of the
- * destination client. */
-void copyClientOutputBuffer(client *dst, client *src) {
- listEmpty(dst->reply);
- dst->sentlen = 0;
- dst->bufpos = 0;
- dst->reply_bytes = 0;
+/* Logically copy 'src' replica client buffers info to 'dst' replica.
+ * Basically increase referenced buffer block node reference count. */
+void copyReplicaOutputBuffer(client *dst, client *src) {
+ serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
- /* First copy src static buffer into dst (either static buffer or reply
- * list, maybe clients have different 'usable_buffer_size'). */
- _addReplyToBufferOrList(dst,src->buf,src->bufpos);
-
- /* Copy src reply list into the dest. */
- list* reply = listDup(src->reply);
- listJoin(dst->reply,reply);
- dst->reply_bytes += src->reply_bytes;
- listRelease(reply);
+ if (src->ref_repl_buf_node == NULL) return;
+ dst->ref_repl_buf_node = src->ref_repl_buf_node;
+ dst->ref_block_pos = src->ref_block_pos;
+ ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++;
}
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
- return c->bufpos || listLength(c->reply);
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ /* Replicas use global shared replication buffer instead of
+ * private output buffer. */
+ serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
+ if (c->ref_repl_buf_node == NULL) return 0;
+
+ /* If the last replication buffer block content is totally sent,
+ * we have nothing to send. */
+ listNode *ln = listLast(server.repl_buffer_blocks);
+ replBufBlock *tail = listNodeValue(ln);
+ if (ln == c->ref_repl_buf_node &&
+ c->ref_block_pos == tail->used) return 0;
+
+ return 1;
+ } else {
+ return c->bufpos || listLength(c->reply);
+ }
}
void clientAcceptHandler(connection *conn) {
@@ -1395,6 +1404,7 @@ void freeClient(client *c) {
/* Free data structures. */
listRelease(c->reply);
+ freeReplicaReferencedReplBuffer(c);
freeClientArgv(c);
freeClientOriginalArgv(c);
@@ -1542,6 +1552,77 @@ client *lookupClientByID(uint64_t id) {
return (c == raxNotFound) ? NULL : c;
}
+/* This function does actual writing output buffers to different types of
+ * clients, it is called by writeToClient.
+ * If we write successfully, it return C_OK, otherwise, C_ERR is returned,
+ * And 'nwritten' is a output parameter, it means how many bytes server write
+ * to client. */
+int _writeToClient(client *c, ssize_t *nwritten) {
+ *nwritten = 0;
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
+
+ replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
+ serverAssert(o->used >= c->ref_block_pos);
+ /* Send current block if it is not fully sent. */
+ if (o->used > c->ref_block_pos) {
+ *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
+ o->used-c->ref_block_pos);
+ if (*nwritten <= 0) return C_ERR;
+ c->ref_block_pos += *nwritten;
+ }
+
+ /* If we fully sent the object on head, go to the next one. */
+ listNode *next = listNextNode(c->ref_repl_buf_node);
+ if (next && c->ref_block_pos == o->used) {
+ o->refcount--;
+ ((replBufBlock *)(listNodeValue(next)))->refcount++;
+ c->ref_repl_buf_node = next;
+ c->ref_block_pos = 0;
+ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
+ }
+ return C_OK;
+ }
+
+ if (c->bufpos > 0) {
+ *nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
+ if (*nwritten <= 0) return C_ERR;
+ c->sentlen += *nwritten;
+
+ /* If the buffer was sent, set bufpos to zero to continue with
+ * the remainder of the reply. */
+ if ((int)c->sentlen == c->bufpos) {
+ c->bufpos = 0;
+ c->sentlen = 0;
+ }
+ } else {
+ clientReplyBlock *o = listNodeValue(listFirst(c->reply));
+ size_t objlen = o->used;
+
+ if (objlen == 0) {
+ c->reply_bytes -= o->size;
+ listDelNode(c->reply,listFirst(c->reply));
+ return C_OK;
+ }
+
+ *nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
+ if (*nwritten <= 0) return C_ERR;
+ c->sentlen += *nwritten;
+
+ /* If we fully sent the object on head go to the next one */
+ if (c->sentlen == objlen) {
+ c->reply_bytes -= o->size;
+ listDelNode(c->reply,listFirst(c->reply));
+ c->sentlen = 0;
+ /* If there are no longer objects in the list, we expect
+ * the count of reply bytes to be exactly zero. */
+ if (listLength(c->reply) == 0)
+ serverAssert(c->reply_bytes == 0);
+ }
+ }
+ return C_OK;
+}
+
/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed because of some
* error. If handler_installed is set, it will attempt to clear the
@@ -1555,48 +1636,11 @@ int writeToClient(client *c, int handler_installed) {
atomicIncr(server.stat_total_writes_processed, 1);
ssize_t nwritten = 0, totwritten = 0;
- size_t objlen;
- clientReplyBlock *o;
while(clientHasPendingReplies(c)) {
- if (c->bufpos > 0) {
- nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
- if (nwritten <= 0) break;
- c->sentlen += nwritten;
- totwritten += nwritten;
-
- /* If the buffer was sent, set bufpos to zero to continue with
- * the remainder of the reply. */
- if ((int)c->sentlen == c->bufpos) {
- c->bufpos = 0;
- c->sentlen = 0;
- }
- } else {
- o = listNodeValue(listFirst(c->reply));
- objlen = o->used;
-
- if (objlen == 0) {
- c->reply_bytes -= o->size;
- listDelNode(c->reply,listFirst(c->reply));
- continue;
- }
-
- nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
- if (nwritten <= 0) break;
- c->sentlen += nwritten;
- totwritten += nwritten;
-
- /* If we fully sent the object on head go to the next one */
- if (c->sentlen == objlen) {
- c->reply_bytes -= o->size;
- listDelNode(c->reply,listFirst(c->reply));
- c->sentlen = 0;
- /* If there are no longer objects in the list, we expect
- * the count of reply bytes to be exactly zero. */
- if (listLength(c->reply) == 0)
- serverAssert(c->reply_bytes == 0);
- }
- }
+ int ret = _writeToClient(c, &nwritten);
+ if (ret == C_ERR) break;
+ totwritten += nwritten;
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
@@ -2077,8 +2121,7 @@ void commandProcessed(client *c) {
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
if (applied) {
- replicationFeedSlavesFromMasterStream(server.slaves,
- c->pending_querybuf, applied);
+ replicationFeedStreamFromMasterStream(c->pending_querybuf,applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
@@ -2399,6 +2442,13 @@ sds catClientInfoString(sds s, client *client) {
/* Compute the total memory consumed by this client. */
size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
+ size_t used_blocks_of_repl_buf = 0;
+ if (client->ref_repl_buf_node) {
+ replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
+ replBufBlock *cur = listNodeValue(client->ref_repl_buf_node);
+ used_blocks_of_repl_buf = last->id - cur->id + 1;
+ }
+
sds cmdname = client->lastcmd ? getFullCommandName(client->lastcmd) : NULL;
sds ret = sdscatfmt(s,
"id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
@@ -2419,7 +2469,7 @@ sds catClientInfoString(sds s, client *client) {
(unsigned long long) client->argv_len_sum,
(unsigned long long) client->mstate.argv_len_sums,
(unsigned long long) client->bufpos,
- (unsigned long long) listLength(client->reply),
+ (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf,
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
(unsigned long long) total_mem,
events,
@@ -3247,8 +3297,21 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
size_t getClientOutputBufferMemoryUsage(client *c) {
- size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
- return c->reply_bytes + (list_item_size*listLength(c->reply));
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ size_t repl_buf_size = 0;
+ size_t repl_node_num = 0;
+ size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock);
+ if (c->ref_repl_buf_node) {
+ replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
+ replBufBlock *cur = listNodeValue(c->ref_repl_buf_node);
+ repl_buf_size = last->repl_offset + last->size - cur->repl_offset;
+ repl_node_num = last->id - cur->id + 1;
+ }
+ return repl_buf_size + (repl_node_size*repl_node_num);
+ } else {
+ size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
+ return c->reply_bytes + (list_item_size*listLength(c->reply));
+ }
}
/* Returns the total client's memory usage.
@@ -3332,8 +3395,18 @@ int checkClientOutputBufferLimits(client *c) {
* like normal clients. */
if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
+ /* Note that it doesn't make sense to set the replica clients output buffer
+ * limit lower than the repl-backlog-size config (partial sync will succeed
+ * and then replica will get disconnected).
+ * Such a configuration is ignored (the size of repl-backlog-size will be used).
+ * This doesn't have memory consumption implications since the replica client
+ * will share the backlog buffers memory. */
+ size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes;
+ if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes &&
+ (long long)hard_limit_bytes < server.repl_backlog_size)
+ hard_limit_bytes = server.repl_backlog_size;
if (server.client_obuf_limits[class].hard_limit_bytes &&
- used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
+ used_mem >= hard_limit_bytes)
hard = 1;
if (server.client_obuf_limits[class].soft_limit_bytes &&
used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
@@ -3375,7 +3448,10 @@ int checkClientOutputBufferLimits(client *c) {
int closeClientOnOutputBufferLimitReached(client *c, int async) {
if (!c->conn) return 0; /* It is unsafe to free fake clients. */
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
- if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return 0;
+ /* Note that c->reply_bytes is irrelevant for replica clients
+ * (they use the global repl buffers). */
+ if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) ||
+ c->flags & CLIENT_CLOSE_ASAP) return 0;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(),c);
@@ -3740,6 +3816,15 @@ int handleClientsWithPendingWritesUsingThreads(void) {
continue;
}
+ /* Since all replicas and replication backlog use global replication
+ * buffer, to guarantee data accessing thread safe, we must put all
+ * replicas client into io_threads_list[0] i.e. main thread handles
+ * sending the output buffer of all replicas. */
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ listAddNodeTail(io_threads_list[0],c);
+ continue;
+ }
+
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;