diff options
-rw-r--r-- | src/aof.c | 2 | ||||
-rw-r--r-- | src/blocked.c | 5 | ||||
-rw-r--r-- | src/dict.c | 6 | ||||
-rw-r--r-- | src/module.c | 4 | ||||
-rw-r--r-- | src/networking.c | 138 | ||||
-rw-r--r-- | src/notify.c | 4 | ||||
-rw-r--r-- | src/replication.c | 2 | ||||
-rw-r--r-- | src/scripting.c | 4 | ||||
-rw-r--r-- | src/sds.c | 4 | ||||
-rw-r--r-- | src/server.c | 19 | ||||
-rw-r--r-- | src/server.h | 9 | ||||
-rw-r--r-- | src/stream.h | 1 | ||||
-rw-r--r-- | src/t_stream.c | 53 | ||||
-rw-r--r-- | src/zmalloc.c | 3 | ||||
-rw-r--r-- | src/zmalloc.h | 3 | ||||
-rw-r--r-- | tests/unit/maxmemory.tcl | 92 |
16 files changed, 265 insertions, 84 deletions
@@ -645,7 +645,7 @@ struct client *createFakeClient(void) { c->obuf_soft_limit_reached_time = 0; c->watched_keys = listCreate(); c->peerid = NULL; - listSetFreeMethod(c->reply,decrRefCountVoid); + listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); initClientMultiState(c); return c; diff --git a/src/blocked.c b/src/blocked.c index d8ae596d9..4a667501f 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -397,10 +397,7 @@ void handleClientsBlockedOnKeys(void) { } } - if (s->last_id.ms > gt->ms || - (s->last_id.ms == gt->ms && - s->last_id.seq > gt->seq)) - { + if (streamCompareID(&s->last_id, gt) > 0) { streamID start = *gt; start.seq++; /* Can't overflow, it's an uint64_t */ diff --git a/src/dict.c b/src/dict.c index 738b38c46..2cf9d4839 100644 --- a/src/dict.c +++ b/src/dict.c @@ -705,8 +705,10 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) { * table, there will be no elements in both tables up to * the current rehashing index, so we jump if possible. * (this happens when going from big to small table). */ - if (i >= d->ht[1].size) i = d->rehashidx; - continue; + if (i >= d->ht[1].size) + i = d->rehashidx; + else + continue; } if (i >= d->ht[j].size) continue; /* Out of range for this table. */ dictEntry *he = d->ht[j].table[i]; diff --git a/src/module.c b/src/module.c index a46b86a50..9809cd74e 100644 --- a/src/module.c +++ b/src/module.c @@ -2712,9 +2712,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch sds proto = sdsnewlen(c->buf,c->bufpos); c->bufpos = 0; while(listLength(c->reply)) { - sds o = listNodeValue(listFirst(c->reply)); + clientReplyBlock *o = listNodeValue(listFirst(c->reply)); - proto = sdscatsds(proto,o); + proto = sdscatlen(proto,o->buf,o->used); listDelNode(c->reply,listFirst(c->reply)); } reply = moduleCreateCallReplyFromProto(ctx,proto); diff --git a/src/networking.c b/src/networking.c index 58b553fe6..22850a4b6 100644 --- a/src/networking.c +++ b/src/networking.c @@ -56,11 +56,14 @@ size_t getStringObjectSdsUsedMemory(robj *o) { /* Client.reply list dup and free methods. */ void *dupClientReplyValue(void *o) { - return sdsdup(o); + clientReplyBlock *old = o; + clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size); + memcpy(buf, o, sizeof(clientReplyBlock) + old->size); + return buf; } void freeClientReplyValue(void *o) { - sdsfree(o); + zfree(o); } int listMatchObjects(void *a, void *b) { @@ -240,25 +243,35 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) { void _addReplyStringToList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; - if (listLength(c->reply) == 0) { - sds node = sdsnewlen(s,len); - listAddNodeTail(c->reply,node); - c->reply_bytes += len; - } else { - listNode *ln = listLast(c->reply); - sds tail = listNodeValue(ln); - - /* Append to this object when possible. If tail == NULL it was - * set via addDeferredMultiBulkLength(). */ - if (tail && (sdsavail(tail) >= len || sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES)) { - tail = sdscatlen(tail,s,len); - listNodeValue(ln) = tail; - c->reply_bytes += len; - } else { - sds node = sdsnewlen(s,len); - listAddNodeTail(c->reply,node); - c->reply_bytes += len; - } + listNode *ln = listLast(c->reply); + clientReplyBlock *tail = ln? listNodeValue(ln): NULL; + + /* Note that 'tail' may be NULL even if we have a tail node, becuase when + * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just + * fo fill it later, when the size of the bulk length is set. */ + + /* Append to tail string when possible. */ + if (tail) { + /* Copy the part we can fit into the tail, and leave the rest for a + * new node */ + size_t avail = tail->size - tail->used; + size_t copy = avail >= len? len: avail; + memcpy(tail->buf + tail->used, s, copy); + tail->used += copy; + s += copy; + len -= copy; + } + if (len) { + /* Create a new node, make sure it is allocated to at + * least PROTO_REPLY_CHUNK_BYTES */ + size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; + tail = zmalloc(size + sizeof(clientReplyBlock)); + /* take over the allocation's internal fragmentation */ + tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); + tail->used = len; + memcpy(tail->buf, s, len); + listAddNodeTail(c->reply, tail); + c->reply_bytes += tail->size; } asyncCloseClientOnOutputBufferLimitReached(c); } @@ -329,11 +342,18 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { if (!len || s[0] != '-') addReplyString(c,"-ERR ",5); addReplyString(c,s,len); addReplyString(c,"\r\n",2); - if (c->flags & CLIENT_MASTER) { + if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) { + char* to = c->flags & CLIENT_MASTER? "master": "slave"; + char* from = c->flags & CLIENT_MASTER? "slave": "master"; char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>"; - serverLog(LL_WARNING,"== CRITICAL == This slave is sending an error " - "to its master: '%s' after processing the command " - "'%s'", s, cmdname); + serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " + "to its %s: '%s' after processing the command " + "'%s'", from, to, s, cmdname); + /* Here we want to panic because when an instance is sending an + * error to another instance in the context of replication, this can + * only create some kind of offset or data desynchronization. Better + * to catch it ASAP and crash instead of continuing. */ + serverPanic("Continuing is unsafe: replication protocol violation."); } } @@ -390,26 +410,41 @@ void *addDeferredMultiBulkLength(client *c) { /* Populate the length object and try gluing it to the next chunk. */ void setDeferredMultiBulkLength(client *c, void *node, long length) { listNode *ln = (listNode*)node; - sds len, next; + clientReplyBlock *next; + char lenstr[128]; + size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length); /* Abort when *node is NULL: when the client should not accept writes * we return NULL in addDeferredMultiBulkLength() */ if (node == NULL) return; - - len = sdscatprintf(sdsnewlen("*",1),"%ld\r\n",length); - listNodeValue(ln) = len; - c->reply_bytes += sdslen(len); - if (ln->next != NULL) { - next = listNodeValue(ln->next); - - /* Only glue when the next node is non-NULL (an sds in this case) */ - if (next != NULL) { - len = sdscatsds(len,next); - listDelNode(c->reply,ln->next); - listNodeValue(ln) = len; - /* No need to update c->reply_bytes: we are just moving the same - * amount of bytes from one node to another. */ - } + serverAssert(!listNodeValue(ln)); + + /* Normally we fill this dummy NULL node, added by addDeferredMultiBulkLength(), + * with a new buffer structure containing the protocol needed to specify + * the length of the array following. However sometimes when there is + * little memory to move, we may instead remove this NULL node, and prefix + * our protocol in the node immediately after to it, in order to save a + * write(2) syscall later. Conditions needed to do it: + * + * - The next node is non-NULL, + * - It has enough room already allocated + * - And not too large (avoid large memmove) */ + if (ln->next != NULL && (next = listNodeValue(ln->next)) && + next->size - next->used >= lenstr_len && + next->used < PROTO_REPLY_CHUNK_BYTES * 4) { + memmove(next->buf + lenstr_len, next->buf, next->used); + memcpy(next->buf, lenstr, lenstr_len); + next->used += lenstr_len; + listDelNode(c->reply,ln); + } else { + /* Create a new node */ + clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock)); + /* Take over the allocation's internal fragmentation */ + buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock); + buf->used = lenstr_len; + memcpy(buf->buf, lenstr, lenstr_len); + listNodeValue(ln) = buf; + c->reply_bytes += buf->size; } asyncCloseClientOnOutputBufferLimitReached(c); } @@ -580,6 +615,7 @@ void addReplySubcommandSyntaxError(client *c) { * destination client. */ void copyClientOutputBuffer(client *dst, client *src) { listRelease(dst->reply); + dst->sentlen = 0; dst->reply = listDup(src->reply); memcpy(dst->buf,src->buf,src->bufpos); dst->bufpos = src->bufpos; @@ -895,7 +931,7 @@ client *lookupClientByID(uint64_t id) { int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; - sds o; + clientReplyBlock *o; while(clientHasPendingReplies(c)) { if (c->bufpos > 0) { @@ -912,23 +948,24 @@ int writeToClient(int fd, client *c, int handler_installed) { } } else { o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o); + objlen = o->used; if (objlen == 0) { + c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); continue; } - nwritten = write(fd, o + c->sentlen, objlen - c->sentlen); + nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; /* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { + c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; - c->reply_bytes -= objlen; /* If there are no longer objects in the list, we expect * the count of reply bytes to be exactly zero. */ if (listLength(c->reply) == 0) @@ -1065,7 +1102,7 @@ void resetClient(client *c) { * with the error and close the connection. */ int processInlineBuffer(client *c) { char *newline; - int argc, j; + int argc, j, linefeed_chars = 1; sds *argv, aux; size_t querylen; @@ -1083,7 +1120,7 @@ int processInlineBuffer(client *c) { /* Handle the \r\n case. */ if (newline && newline != c->querybuf && *(newline-1) == '\r') - newline--; + newline--, linefeed_chars++; /* Split the input buffer up to the \r\n */ querylen = newline-(c->querybuf); @@ -1103,7 +1140,7 @@ int processInlineBuffer(client *c) { c->repl_ack_time = server.unixtime; /* Leave data after the first line of the query in the buffer */ - sdsrange(c->querybuf,querylen+2,-1); + sdsrange(c->querybuf,querylen+linefeed_chars,-1); /* Setup argv array on client structure */ if (argc) { @@ -1899,10 +1936,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ unsigned long getClientOutputBufferMemoryUsage(client *c) { - unsigned long list_item_size = sizeof(listNode)+5; - /* The +5 above means we assume an sds16 hdr, may not be true - * but is not going to be a problem. */ - + unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); return c->reply_bytes + (list_item_size*listLength(c->reply)); } diff --git a/src/notify.c b/src/notify.c index 6dd72f0a6..1afb36fc0 100644 --- a/src/notify.c +++ b/src/notify.c @@ -29,8 +29,8 @@ #include "server.h" -/* This file implements keyspace events notification via Pub/Sub ad - * described at http://redis.io/topics/keyspace-events. */ +/* This file implements keyspace events notification via Pub/Sub and + * described at https://redis.io/topics/notifications. */ /* Turn a string representing notification classes into an integer * representing notification classes flags xored. diff --git a/src/replication.c b/src/replication.c index 0adef8aec..6d589c012 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2148,6 +2148,8 @@ void replicationCacheMaster(client *c) { server.master->read_reploff = server.master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); + c->sentlen = 0; + c->reply_bytes = 0; c->bufpos = 0; resetClient(c); diff --git a/src/scripting.c b/src/scripting.c index 857859e70..328e3d681 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -575,9 +575,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { reply = sdsnewlen(c->buf,c->bufpos); c->bufpos = 0; while(listLength(c->reply)) { - sds o = listNodeValue(listFirst(c->reply)); + clientReplyBlock *o = listNodeValue(listFirst(c->reply)); - reply = sdscatsds(reply,o); + reply = sdscatlen(reply,o->buf,o->used); listDelNode(c->reply,listFirst(c->reply)); } } @@ -67,8 +67,10 @@ static inline char sdsReqType(size_t string_size) { #if (LONG_MAX == LLONG_MAX) if (string_size < 1ll<<32) return SDS_TYPE_32; -#endif return SDS_TYPE_64; +#else + return SDS_TYPE_32; +#endif } /* Create a new sds string with the content specified by the 'init' pointer diff --git a/src/server.c b/src/server.c index ea61d15ad..dedaebdbe 100644 --- a/src/server.c +++ b/src/server.c @@ -2449,8 +2449,13 @@ int processCommand(client *c) { c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { flagTransaction(c); - addReplyErrorFormat(c,"unknown command '%s'", - (char*)c->argv[0]->ptr); + sds args = sdsempty(); + int i; + for (i=1; i < c->argc && sdslen(args) < 128; i++) + args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); + addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s", + (char*)c->argv[0]->ptr, args); + sdsfree(args); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { @@ -3102,6 +3107,11 @@ sds genRedisInfoString(char *section) { "rss_overhead_bytes:%zu\r\n" "mem_fragmentation_ratio:%.2f\r\n" "mem_fragmentation_bytes:%zu\r\n" + "mem_not_counted_for_evict:%zu\r\n" + "mem_replication_backlog:%zu\r\n" + "mem_clients_slaves:%zu\r\n" + "mem_clients_normal:%zu\r\n" + "mem_aof_buffer:%zu\r\n" "mem_allocator:%s\r\n" "active_defrag_running:%d\r\n" "lazyfree_pending_objects:%zu\r\n", @@ -3134,6 +3144,11 @@ sds genRedisInfoString(char *section) { mh->rss_extra_bytes, mh->total_frag, /* this is the total RSS overhead, including fragmentation, */ mh->total_frag_bytes, /* named so for backwards compatibility */ + freeMemoryGetNotCountedMemory(), + mh->repl_backlog, + mh->clients_slaves, + mh->clients_normal, + mh->aof_buffer, ZMALLOC_LIB, server.active_defrag_running, lazyfreeGetPendingObjectsCount() diff --git a/src/server.h b/src/server.h index f3df6b46b..dae7561d8 100644 --- a/src/server.h +++ b/src/server.h @@ -619,6 +619,13 @@ typedef struct redisObject { struct evictionPoolEntry; /* Defined in evict.c */ +/* This structure is used in order to represent the output buffer of a client, + * which is actually a linked list of blocks like that, that is: client->reply. */ +typedef struct clientReplyBlock { + size_t size, used; + char buf[]; +} clientReplyBlock; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -1423,6 +1430,7 @@ void addReplySubcommandSyntaxError(client *c); void copyClientOutputBuffer(client *dst, client *src); size_t sdsZmallocSize(sds s); size_t getStringObjectSdsUsedMemory(robj *o); +void freeClientReplyValue(void *o); void *dupClientReplyValue(void *o); void getClientsMaxBuffers(unsigned long *longest_output_list, unsigned long *biggest_input_buffer); @@ -1664,6 +1672,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec); /* Core functions */ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level); +size_t freeMemoryGetNotCountedMemory(); int freeMemoryIfNeeded(void); int processCommand(client *c); void setupSignalHandlers(void); diff --git a/src/stream.h b/src/stream.h index 61210f952..ef08753b5 100644 --- a/src/stream.h +++ b/src/stream.h @@ -108,5 +108,6 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); +int streamCompareID(streamID *a, streamID *b); #endif diff --git a/src/t_stream.c b/src/t_stream.c index 54d6b0d1f..201509c7e 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -705,10 +705,19 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { /* Change the valid/deleted entries count in the master entry. */ unsigned char *p = lpFirst(lp); aux = lpGetInteger(p); - lp = lpReplaceInteger(lp,&p,aux-1); - p = lpNext(lp,p); /* Seek deleted field. */ - aux = lpGetInteger(p); - lp = lpReplaceInteger(lp,&p,aux+1); + + if (aux == 1) { + /* If this is the last element in the listpack, we can remove the whole + * node. */ + lpFree(lp); + raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL); + } else { + /* In the base case we alter the counters of valid/deleted entries. */ + lp = lpReplaceInteger(lp,&p,aux-1); + p = lpNext(lp,p); /* Seek deleted field. */ + aux = lpGetInteger(p); + lp = lpReplaceInteger(lp,&p,aux+1); + } /* Update the number of entries counter. */ si->stream->length--; @@ -1342,6 +1351,14 @@ void xreadCommand(client *c) { } if (strcmp(c->argv[i]->ptr,"$") == 0) { + if (xreadgroup) { + addReplyError(c,"The $ ID is meaningless in the context of " + "XREADGROUP: you want to read the history of " + "this consumer by specifying a proper ID, or " + "use the > ID to get new messages. The $ ID would " + "just return an empty result set."); + goto cleanup; + } if (o) { stream *s = o->ptr; ids[id_idx] = s->last_id; @@ -1351,7 +1368,7 @@ void xreadCommand(client *c) { } continue; } else if (strcmp(c->argv[i]->ptr,">") == 0) { - if (!xreadgroup || groupname == NULL) { + if (!xreadgroup) { addReplyError(c,"The > ID can be specified only when calling " "XREADGROUP using the GROUP <group> " "<consumer> option."); @@ -1392,9 +1409,7 @@ void xreadCommand(client *c) { * synchronously in case the group top item delivered is smaller * than what the stream has inside. */ streamID *last = &groups[i]->last_id; - if (s->last_id.ms > last->ms || - (s->last_id.ms == last->ms && s->last_id.seq > last->seq)) - { + if (streamCompareID(&s->last_id, last) > 0) { serve_synchronously = 1; *gt = *last; } @@ -1402,9 +1417,7 @@ void xreadCommand(client *c) { } else { /* For consumers without a group, we serve synchronously if we can * actually provide at least one item from the stream. */ - if (s->last_id.ms > gt->ms || - (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) - { + if (streamCompareID(&s->last_id, gt) > 0) { serve_synchronously = 1; } } @@ -2128,9 +2141,13 @@ void xdelCommand(client *c) { streamParseIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */ deleted += streamDeleteItem(s,&id); } - signalModifiedKey(c->db,c->argv[1]); - notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); - server.dirty += deleted; + + /* Propagate the write if needed. */ + if (deleted) { + signalModifiedKey(c->db,c->argv[1]); + notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); + server.dirty += deleted; + } addReplyLongLong(c,deleted); } @@ -2279,18 +2296,20 @@ NULL raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { streamCG *cg = ri.data; - addReplyMultiBulkLen(c,6); + addReplyMultiBulkLen(c,8); addReplyStatus(c,"name"); addReplyBulkCBuffer(c,ri.key,ri.key_len); addReplyStatus(c,"consumers"); addReplyLongLong(c,raxSize(cg->consumers)); addReplyStatus(c,"pending"); addReplyLongLong(c,raxSize(cg->pel)); + addReplyStatus(c,"last-delivered-id"); + addReplyStreamID(c,&cg->last_id); } raxStop(&ri); } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) { /* XINFO STREAM <key> (or the alias XINFO <key>). */ - addReplyMultiBulkLen(c,12); + addReplyMultiBulkLen(c,14); addReplyStatus(c,"length"); addReplyLongLong(c,s->length); addReplyStatus(c,"radix-tree-keys"); @@ -2299,6 +2318,8 @@ NULL addReplyLongLong(c,s->rax->numnodes); addReplyStatus(c,"groups"); addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); + addReplyStatus(c,"last-generated-id"); + addReplyStreamID(c,&s->last_id); /* To emit the first/last entry we us the streamReplyWithRange() * API. */ diff --git a/src/zmalloc.c b/src/zmalloc.c index 4956f905f..0d9917510 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -182,6 +182,9 @@ size_t zmalloc_size(void *ptr) { if (size&(sizeof(long)-1)) size += sizeof(long)-(size&(sizeof(long)-1)); return size+PREFIX_SIZE; } +size_t zmalloc_usable(void *ptr) { + return zmalloc_usable(ptr)-PREFIX_SIZE; +} #endif void zfree(void *ptr) { diff --git a/src/zmalloc.h b/src/zmalloc.h index 49b33b883..9c9229907 100644 --- a/src/zmalloc.h +++ b/src/zmalloc.h @@ -98,6 +98,9 @@ void *zmalloc_no_tcache(size_t size); #ifndef HAVE_MALLOC_SIZE size_t zmalloc_size(void *ptr); +size_t zmalloc_usable(void *ptr); +#else +#define zmalloc_usable(p) zmalloc_size(p) #endif #endif /* __ZMALLOC_H */ diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 0c3f6b32c..e3cd11114 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -142,3 +142,95 @@ start_server {tags {"maxmemory"}} { } } } + +proc test_slave_buffers {cmd_count payload_len limit_memory pipeline} { + start_server {tags {"maxmemory"}} { + start_server {} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + + # add 100 keys of 100k (10MB total) + for {set j 0} {$j < 100} {incr j} { + $master setrange "key:$j" 100000 asdf + } + + $master config set maxmemory-policy allkeys-random + $master config set client-output-buffer-limit "slave 100000000 100000000 60" + $master config set repl-backlog-size [expr {10*1024}] + + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started." + } + + # measure used memory after the slave connected and set maxmemory + set orig_used [s -1 used_memory] + set orig_client_buf [s -1 mem_clients_normal] + set orig_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict] + set orig_used_no_repl [expr {$orig_used - $orig_mem_not_counted_for_evict}] + set limit [expr {$orig_used - $orig_mem_not_counted_for_evict + 20*1024}] + + if {$limit_memory==1} { + $master config set maxmemory $limit + } + + # put the slave to sleep + set rd_slave [redis_deferring_client] + $rd_slave debug sleep 60 + + # send some 10mb woth of commands that don't increase the memory usage + if {$pipeline == 1} { + set rd_master [redis_deferring_client -1] + for {set k 0} {$k < $cmd_count} {incr k} { + $rd_master setrange key:0 0 [string repeat A $payload_len] + } + for {set k 0} {$k < $cmd_count} {incr k} { + #$rd_master read + } + } else { + for {set k 0} {$k < $cmd_count} {incr k} { + $master setrange key:0 0 [string repeat A $payload_len] + } + } + + set new_used [s -1 used_memory] + set slave_buf [s -1 mem_clients_slaves] + set client_buf [s -1 mem_clients_normal] + set mem_not_counted_for_evict [s -1 mem_not_counted_for_evict] + set used_no_repl [expr {$new_used - $mem_not_counted_for_evict}] + set delta [expr {($used_no_repl - $client_buf) - ($orig_used_no_repl - $orig_client_buf)}] + + assert {[$master dbsize] == 100} + assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers + assert {$delta < 50*1024 && $delta > -50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + + $master client kill type slave + set killed_used [s -1 used_memory] + set killed_slave_buf [s -1 mem_clients_slaves] + set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict] + set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}] + set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}] + assert {$killed_slave_buf == 0} + assert {$delta_no_repl > -50*1024 && $delta_no_repl < 50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + } + } +} + +test {slave buffer are counted correctly} { + # we wanna use many small commands, and we don't wanna wait long + # so we need to use a pipeline (redis_deferring_client) + # that may cause query buffer to fill and induce eviction, so we disable it + test_slave_buffers 1000000 10 0 1 +} + +test {slave buffer don't induce eviction} { + # test again with fewer (and bigger) commands without pipeline, but with eviction + test_slave_buffers 100000 100 1 0 +} + |