diff options
Diffstat (limited to 'src/networking.c')
-rw-r--r-- | src/networking.c | 106 |
1 files changed, 61 insertions, 45 deletions
diff --git a/src/networking.c b/src/networking.c index 58b553fe6..36ae26e0b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -56,11 +56,14 @@ size_t getStringObjectSdsUsedMemory(robj *o) { /* Client.reply list dup and free methods. */ void *dupClientReplyValue(void *o) { - return sdsdup(o); + clientReplyBlock *old = o; + clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size); + memcpy(buf, o, sizeof(clientReplyBlock) + old->size); + return buf; } void freeClientReplyValue(void *o) { - sdsfree(o); + zfree(o); } int listMatchObjects(void *a, void *b) { @@ -240,25 +243,31 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) { void _addReplyStringToList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; - if (listLength(c->reply) == 0) { - sds node = sdsnewlen(s,len); - listAddNodeTail(c->reply,node); - c->reply_bytes += len; - } else { - listNode *ln = listLast(c->reply); - sds tail = listNodeValue(ln); - - /* Append to this object when possible. If tail == NULL it was - * set via addDeferredMultiBulkLength(). */ - if (tail && (sdsavail(tail) >= len || sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES)) { - tail = sdscatlen(tail,s,len); - listNodeValue(ln) = tail; - c->reply_bytes += len; - } else { - sds node = sdsnewlen(s,len); - listAddNodeTail(c->reply,node); - c->reply_bytes += len; - } + listNode *ln = listLast(c->reply); + clientReplyBlock *tail = ln? listNodeValue(ln): NULL; + /* It is possible that we have a tail list node, but no tail buffer. + * if addDeferredMultiBulkLength() was used. */ + + /* Append to tail string when possible. */ + if (tail) { + /* Copy the part we can fit into the tail, and leave the rest for a new node */ + size_t avail = tail->size - tail->used; + size_t copy = avail >= len? len: avail; + memcpy(tail->buf + tail->used, s, copy); + tail->used += copy; + s += copy; + len -= copy; + } + if (len) { + /* Create a new node, make sure it is allocated to at least PROTO_REPLY_CHUNK_BYTES */ + size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; + tail = zmalloc(size + sizeof(clientReplyBlock)); + /* take over the allocation's internal fragmentation */ + tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); + tail->used = len; + memcpy(tail->buf, s, len); + listAddNodeTail(c->reply, tail); + c->reply_bytes += tail->size; } asyncCloseClientOnOutputBufferLimitReached(c); } @@ -390,26 +399,35 @@ void *addDeferredMultiBulkLength(client *c) { /* Populate the length object and try gluing it to the next chunk. */ void setDeferredMultiBulkLength(client *c, void *node, long length) { listNode *ln = (listNode*)node; - sds len, next; + clientReplyBlock *next; + char lenstr[128]; + size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length); /* Abort when *node is NULL: when the client should not accept writes * we return NULL in addDeferredMultiBulkLength() */ if (node == NULL) return; - - len = sdscatprintf(sdsnewlen("*",1),"%ld\r\n",length); - listNodeValue(ln) = len; - c->reply_bytes += sdslen(len); - if (ln->next != NULL) { - next = listNodeValue(ln->next); - - /* Only glue when the next node is non-NULL (an sds in this case) */ - if (next != NULL) { - len = sdscatsds(len,next); - listDelNode(c->reply,ln->next); - listNodeValue(ln) = len; - /* No need to update c->reply_bytes: we are just moving the same - * amount of bytes from one node to another. */ - } + serverAssert(!listNodeValue(ln)); + + /* Glue into next node when: + * - the next node is non-NULL, + * - it has enough room already allocated + * - and not too large (avoid large memmove) */ + if (ln->next != NULL && (next = listNodeValue(ln->next)) && + next->size - next->used >= lenstr_len && + next->used < PROTO_REPLY_CHUNK_BYTES * 4) { + memmove(next->buf + lenstr_len, next->buf, next->used); + memcpy(next->buf, lenstr, lenstr_len); + next->used += lenstr_len; + listDelNode(c->reply,ln); + } else { + /* Create a new node */ + clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock)); + /* take over the allocation's internal fragmentation */ + buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock); + buf->used = lenstr_len; + memcpy(buf->buf, lenstr, lenstr_len); + listNodeValue(ln) = buf; + c->reply_bytes += buf->size; } asyncCloseClientOnOutputBufferLimitReached(c); } @@ -895,7 +913,7 @@ client *lookupClientByID(uint64_t id) { int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; - sds o; + clientReplyBlock *o; while(clientHasPendingReplies(c)) { if (c->bufpos > 0) { @@ -912,23 +930,24 @@ int writeToClient(int fd, client *c, int handler_installed) { } } else { o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o); + objlen = o->used; if (objlen == 0) { + c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); continue; } - nwritten = write(fd, o + c->sentlen, objlen - c->sentlen); + nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; /* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { + c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; - c->reply_bytes -= objlen; /* If there are no longer objects in the list, we expect * the count of reply bytes to be exactly zero. */ if (listLength(c->reply) == 0) @@ -1899,10 +1918,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ unsigned long getClientOutputBufferMemoryUsage(client *c) { - unsigned long list_item_size = sizeof(listNode)+5; - /* The +5 above means we assume an sds16 hdr, may not be true - * but is not going to be a problem. */ - + unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); return c->reply_bytes + (list_item_size*listLength(c->reply)); } |