summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/aof.c2
-rw-r--r--src/blocked.c5
-rw-r--r--src/dict.c6
-rw-r--r--src/module.c4
-rw-r--r--src/networking.c138
-rw-r--r--src/notify.c4
-rw-r--r--src/replication.c2
-rw-r--r--src/scripting.c4
-rw-r--r--src/sds.c4
-rw-r--r--src/server.c19
-rw-r--r--src/server.h9
-rw-r--r--src/stream.h1
-rw-r--r--src/t_stream.c53
-rw-r--r--src/zmalloc.c3
-rw-r--r--src/zmalloc.h3
-rw-r--r--tests/unit/maxmemory.tcl92
16 files changed, 265 insertions, 84 deletions
diff --git a/src/aof.c b/src/aof.c
index 2d5f259fb..be416ec4e 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -645,7 +645,7 @@ struct client *createFakeClient(void) {
c->obuf_soft_limit_reached_time = 0;
c->watched_keys = listCreate();
c->peerid = NULL;
- listSetFreeMethod(c->reply,decrRefCountVoid);
+ listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
initClientMultiState(c);
return c;
diff --git a/src/blocked.c b/src/blocked.c
index d8ae596d9..4a667501f 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -397,10 +397,7 @@ void handleClientsBlockedOnKeys(void) {
}
}
- if (s->last_id.ms > gt->ms ||
- (s->last_id.ms == gt->ms &&
- s->last_id.seq > gt->seq))
- {
+ if (streamCompareID(&s->last_id, gt) > 0) {
streamID start = *gt;
start.seq++; /* Can't overflow, it's an uint64_t */
diff --git a/src/dict.c b/src/dict.c
index 738b38c46..2cf9d4839 100644
--- a/src/dict.c
+++ b/src/dict.c
@@ -705,8 +705,10 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {
* table, there will be no elements in both tables up to
* the current rehashing index, so we jump if possible.
* (this happens when going from big to small table). */
- if (i >= d->ht[1].size) i = d->rehashidx;
- continue;
+ if (i >= d->ht[1].size)
+ i = d->rehashidx;
+ else
+ continue;
}
if (i >= d->ht[j].size) continue; /* Out of range for this table. */
dictEntry *he = d->ht[j].table[i];
diff --git a/src/module.c b/src/module.c
index a46b86a50..9809cd74e 100644
--- a/src/module.c
+++ b/src/module.c
@@ -2712,9 +2712,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
sds proto = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0;
while(listLength(c->reply)) {
- sds o = listNodeValue(listFirst(c->reply));
+ clientReplyBlock *o = listNodeValue(listFirst(c->reply));
- proto = sdscatsds(proto,o);
+ proto = sdscatlen(proto,o->buf,o->used);
listDelNode(c->reply,listFirst(c->reply));
}
reply = moduleCreateCallReplyFromProto(ctx,proto);
diff --git a/src/networking.c b/src/networking.c
index 58b553fe6..22850a4b6 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -56,11 +56,14 @@ size_t getStringObjectSdsUsedMemory(robj *o) {
/* Client.reply list dup and free methods. */
void *dupClientReplyValue(void *o) {
- return sdsdup(o);
+ clientReplyBlock *old = o;
+ clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size);
+ memcpy(buf, o, sizeof(clientReplyBlock) + old->size);
+ return buf;
}
void freeClientReplyValue(void *o) {
- sdsfree(o);
+ zfree(o);
}
int listMatchObjects(void *a, void *b) {
@@ -240,25 +243,35 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
void _addReplyStringToList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
- if (listLength(c->reply) == 0) {
- sds node = sdsnewlen(s,len);
- listAddNodeTail(c->reply,node);
- c->reply_bytes += len;
- } else {
- listNode *ln = listLast(c->reply);
- sds tail = listNodeValue(ln);
-
- /* Append to this object when possible. If tail == NULL it was
- * set via addDeferredMultiBulkLength(). */
- if (tail && (sdsavail(tail) >= len || sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES)) {
- tail = sdscatlen(tail,s,len);
- listNodeValue(ln) = tail;
- c->reply_bytes += len;
- } else {
- sds node = sdsnewlen(s,len);
- listAddNodeTail(c->reply,node);
- c->reply_bytes += len;
- }
+ listNode *ln = listLast(c->reply);
+ clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
+
+ /* Note that 'tail' may be NULL even if we have a tail node, becuase when
+ * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just
+ * fo fill it later, when the size of the bulk length is set. */
+
+ /* Append to tail string when possible. */
+ if (tail) {
+ /* Copy the part we can fit into the tail, and leave the rest for a
+ * new node */
+ size_t avail = tail->size - tail->used;
+ size_t copy = avail >= len? len: avail;
+ memcpy(tail->buf + tail->used, s, copy);
+ tail->used += copy;
+ s += copy;
+ len -= copy;
+ }
+ if (len) {
+ /* Create a new node, make sure it is allocated to at
+ * least PROTO_REPLY_CHUNK_BYTES */
+ size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
+ tail = zmalloc(size + sizeof(clientReplyBlock));
+ /* take over the allocation's internal fragmentation */
+ tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
+ tail->used = len;
+ memcpy(tail->buf, s, len);
+ listAddNodeTail(c->reply, tail);
+ c->reply_bytes += tail->size;
}
asyncCloseClientOnOutputBufferLimitReached(c);
}
@@ -329,11 +342,18 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
if (!len || s[0] != '-') addReplyString(c,"-ERR ",5);
addReplyString(c,s,len);
addReplyString(c,"\r\n",2);
- if (c->flags & CLIENT_MASTER) {
+ if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) {
+ char* to = c->flags & CLIENT_MASTER? "master": "slave";
+ char* from = c->flags & CLIENT_MASTER? "slave": "master";
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
- serverLog(LL_WARNING,"== CRITICAL == This slave is sending an error "
- "to its master: '%s' after processing the command "
- "'%s'", s, cmdname);
+ serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
+ "to its %s: '%s' after processing the command "
+ "'%s'", from, to, s, cmdname);
+ /* Here we want to panic because when an instance is sending an
+ * error to another instance in the context of replication, this can
+ * only create some kind of offset or data desynchronization. Better
+ * to catch it ASAP and crash instead of continuing. */
+ serverPanic("Continuing is unsafe: replication protocol violation.");
}
}
@@ -390,26 +410,41 @@ void *addDeferredMultiBulkLength(client *c) {
/* Populate the length object and try gluing it to the next chunk. */
void setDeferredMultiBulkLength(client *c, void *node, long length) {
listNode *ln = (listNode*)node;
- sds len, next;
+ clientReplyBlock *next;
+ char lenstr[128];
+ size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length);
/* Abort when *node is NULL: when the client should not accept writes
* we return NULL in addDeferredMultiBulkLength() */
if (node == NULL) return;
-
- len = sdscatprintf(sdsnewlen("*",1),"%ld\r\n",length);
- listNodeValue(ln) = len;
- c->reply_bytes += sdslen(len);
- if (ln->next != NULL) {
- next = listNodeValue(ln->next);
-
- /* Only glue when the next node is non-NULL (an sds in this case) */
- if (next != NULL) {
- len = sdscatsds(len,next);
- listDelNode(c->reply,ln->next);
- listNodeValue(ln) = len;
- /* No need to update c->reply_bytes: we are just moving the same
- * amount of bytes from one node to another. */
- }
+ serverAssert(!listNodeValue(ln));
+
+ /* Normally we fill this dummy NULL node, added by addDeferredMultiBulkLength(),
+ * with a new buffer structure containing the protocol needed to specify
+ * the length of the array following. However sometimes when there is
+ * little memory to move, we may instead remove this NULL node, and prefix
+ * our protocol in the node immediately after to it, in order to save a
+ * write(2) syscall later. Conditions needed to do it:
+ *
+ * - The next node is non-NULL,
+ * - It has enough room already allocated
+ * - And not too large (avoid large memmove) */
+ if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
+ next->size - next->used >= lenstr_len &&
+ next->used < PROTO_REPLY_CHUNK_BYTES * 4) {
+ memmove(next->buf + lenstr_len, next->buf, next->used);
+ memcpy(next->buf, lenstr, lenstr_len);
+ next->used += lenstr_len;
+ listDelNode(c->reply,ln);
+ } else {
+ /* Create a new node */
+ clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock));
+ /* Take over the allocation's internal fragmentation */
+ buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock);
+ buf->used = lenstr_len;
+ memcpy(buf->buf, lenstr, lenstr_len);
+ listNodeValue(ln) = buf;
+ c->reply_bytes += buf->size;
}
asyncCloseClientOnOutputBufferLimitReached(c);
}
@@ -580,6 +615,7 @@ void addReplySubcommandSyntaxError(client *c) {
* destination client. */
void copyClientOutputBuffer(client *dst, client *src) {
listRelease(dst->reply);
+ dst->sentlen = 0;
dst->reply = listDup(src->reply);
memcpy(dst->buf,src->buf,src->bufpos);
dst->bufpos = src->bufpos;
@@ -895,7 +931,7 @@ client *lookupClientByID(uint64_t id) {
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
- sds o;
+ clientReplyBlock *o;
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
@@ -912,23 +948,24 @@ int writeToClient(int fd, client *c, int handler_installed) {
}
} else {
o = listNodeValue(listFirst(c->reply));
- objlen = sdslen(o);
+ objlen = o->used;
if (objlen == 0) {
+ c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
- nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
+ nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
+ c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
- c->reply_bytes -= objlen;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
@@ -1065,7 +1102,7 @@ void resetClient(client *c) {
* with the error and close the connection. */
int processInlineBuffer(client *c) {
char *newline;
- int argc, j;
+ int argc, j, linefeed_chars = 1;
sds *argv, aux;
size_t querylen;
@@ -1083,7 +1120,7 @@ int processInlineBuffer(client *c) {
/* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r')
- newline--;
+ newline--, linefeed_chars++;
/* Split the input buffer up to the \r\n */
querylen = newline-(c->querybuf);
@@ -1103,7 +1140,7 @@ int processInlineBuffer(client *c) {
c->repl_ack_time = server.unixtime;
/* Leave data after the first line of the query in the buffer */
- sdsrange(c->querybuf,querylen+2,-1);
+ sdsrange(c->querybuf,querylen+linefeed_chars,-1);
/* Setup argv array on client structure */
if (argc) {
@@ -1899,10 +1936,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
unsigned long getClientOutputBufferMemoryUsage(client *c) {
- unsigned long list_item_size = sizeof(listNode)+5;
- /* The +5 above means we assume an sds16 hdr, may not be true
- * but is not going to be a problem. */
-
+ unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
return c->reply_bytes + (list_item_size*listLength(c->reply));
}
diff --git a/src/notify.c b/src/notify.c
index 6dd72f0a6..1afb36fc0 100644
--- a/src/notify.c
+++ b/src/notify.c
@@ -29,8 +29,8 @@
#include "server.h"
-/* This file implements keyspace events notification via Pub/Sub ad
- * described at http://redis.io/topics/keyspace-events. */
+/* This file implements keyspace events notification via Pub/Sub and
+ * described at https://redis.io/topics/notifications. */
/* Turn a string representing notification classes into an integer
* representing notification classes flags xored.
diff --git a/src/replication.c b/src/replication.c
index 0adef8aec..6d589c012 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -2148,6 +2148,8 @@ void replicationCacheMaster(client *c) {
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
+ c->sentlen = 0;
+ c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);
diff --git a/src/scripting.c b/src/scripting.c
index 857859e70..328e3d681 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -575,9 +575,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
reply = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0;
while(listLength(c->reply)) {
- sds o = listNodeValue(listFirst(c->reply));
+ clientReplyBlock *o = listNodeValue(listFirst(c->reply));
- reply = sdscatsds(reply,o);
+ reply = sdscatlen(reply,o->buf,o->used);
listDelNode(c->reply,listFirst(c->reply));
}
}
diff --git a/src/sds.c b/src/sds.c
index c4f43603d..39ad595ed 100644
--- a/src/sds.c
+++ b/src/sds.c
@@ -67,8 +67,10 @@ static inline char sdsReqType(size_t string_size) {
#if (LONG_MAX == LLONG_MAX)
if (string_size < 1ll<<32)
return SDS_TYPE_32;
-#endif
return SDS_TYPE_64;
+#else
+ return SDS_TYPE_32;
+#endif
}
/* Create a new sds string with the content specified by the 'init' pointer
diff --git a/src/server.c b/src/server.c
index ea61d15ad..dedaebdbe 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2449,8 +2449,13 @@ int processCommand(client *c) {
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
- addReplyErrorFormat(c,"unknown command '%s'",
- (char*)c->argv[0]->ptr);
+ sds args = sdsempty();
+ int i;
+ for (i=1; i < c->argc && sdslen(args) < 128; i++)
+ args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
+ addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
+ (char*)c->argv[0]->ptr, args);
+ sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
@@ -3102,6 +3107,11 @@ sds genRedisInfoString(char *section) {
"rss_overhead_bytes:%zu\r\n"
"mem_fragmentation_ratio:%.2f\r\n"
"mem_fragmentation_bytes:%zu\r\n"
+ "mem_not_counted_for_evict:%zu\r\n"
+ "mem_replication_backlog:%zu\r\n"
+ "mem_clients_slaves:%zu\r\n"
+ "mem_clients_normal:%zu\r\n"
+ "mem_aof_buffer:%zu\r\n"
"mem_allocator:%s\r\n"
"active_defrag_running:%d\r\n"
"lazyfree_pending_objects:%zu\r\n",
@@ -3134,6 +3144,11 @@ sds genRedisInfoString(char *section) {
mh->rss_extra_bytes,
mh->total_frag, /* this is the total RSS overhead, including fragmentation, */
mh->total_frag_bytes, /* named so for backwards compatibility */
+ freeMemoryGetNotCountedMemory(),
+ mh->repl_backlog,
+ mh->clients_slaves,
+ mh->clients_normal,
+ mh->aof_buffer,
ZMALLOC_LIB,
server.active_defrag_running,
lazyfreeGetPendingObjectsCount()
diff --git a/src/server.h b/src/server.h
index f3df6b46b..dae7561d8 100644
--- a/src/server.h
+++ b/src/server.h
@@ -619,6 +619,13 @@ typedef struct redisObject {
struct evictionPoolEntry; /* Defined in evict.c */
+/* This structure is used in order to represent the output buffer of a client,
+ * which is actually a linked list of blocks like that, that is: client->reply. */
+typedef struct clientReplyBlock {
+ size_t size, used;
+ char buf[];
+} clientReplyBlock;
+
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
@@ -1423,6 +1430,7 @@ void addReplySubcommandSyntaxError(client *c);
void copyClientOutputBuffer(client *dst, client *src);
size_t sdsZmallocSize(sds s);
size_t getStringObjectSdsUsedMemory(robj *o);
+void freeClientReplyValue(void *o);
void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer);
@@ -1664,6 +1672,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec);
/* Core functions */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level);
+size_t freeMemoryGetNotCountedMemory();
int freeMemoryIfNeeded(void);
int processCommand(client *c);
void setupSignalHandlers(void);
diff --git a/src/stream.h b/src/stream.h
index 61210f952..ef08753b5 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -108,5 +108,6 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
+int streamCompareID(streamID *a, streamID *b);
#endif
diff --git a/src/t_stream.c b/src/t_stream.c
index 54d6b0d1f..201509c7e 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -705,10 +705,19 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
/* Change the valid/deleted entries count in the master entry. */
unsigned char *p = lpFirst(lp);
aux = lpGetInteger(p);
- lp = lpReplaceInteger(lp,&p,aux-1);
- p = lpNext(lp,p); /* Seek deleted field. */
- aux = lpGetInteger(p);
- lp = lpReplaceInteger(lp,&p,aux+1);
+
+ if (aux == 1) {
+ /* If this is the last element in the listpack, we can remove the whole
+ * node. */
+ lpFree(lp);
+ raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL);
+ } else {
+ /* In the base case we alter the counters of valid/deleted entries. */
+ lp = lpReplaceInteger(lp,&p,aux-1);
+ p = lpNext(lp,p); /* Seek deleted field. */
+ aux = lpGetInteger(p);
+ lp = lpReplaceInteger(lp,&p,aux+1);
+ }
/* Update the number of entries counter. */
si->stream->length--;
@@ -1342,6 +1351,14 @@ void xreadCommand(client *c) {
}
if (strcmp(c->argv[i]->ptr,"$") == 0) {
+ if (xreadgroup) {
+ addReplyError(c,"The $ ID is meaningless in the context of "
+ "XREADGROUP: you want to read the history of "
+ "this consumer by specifying a proper ID, or "
+ "use the > ID to get new messages. The $ ID would "
+ "just return an empty result set.");
+ goto cleanup;
+ }
if (o) {
stream *s = o->ptr;
ids[id_idx] = s->last_id;
@@ -1351,7 +1368,7 @@ void xreadCommand(client *c) {
}
continue;
} else if (strcmp(c->argv[i]->ptr,">") == 0) {
- if (!xreadgroup || groupname == NULL) {
+ if (!xreadgroup) {
addReplyError(c,"The > ID can be specified only when calling "
"XREADGROUP using the GROUP <group> "
"<consumer> option.");
@@ -1392,9 +1409,7 @@ void xreadCommand(client *c) {
* synchronously in case the group top item delivered is smaller
* than what the stream has inside. */
streamID *last = &groups[i]->last_id;
- if (s->last_id.ms > last->ms ||
- (s->last_id.ms == last->ms && s->last_id.seq > last->seq))
- {
+ if (streamCompareID(&s->last_id, last) > 0) {
serve_synchronously = 1;
*gt = *last;
}
@@ -1402,9 +1417,7 @@ void xreadCommand(client *c) {
} else {
/* For consumers without a group, we serve synchronously if we can
* actually provide at least one item from the stream. */
- if (s->last_id.ms > gt->ms ||
- (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
- {
+ if (streamCompareID(&s->last_id, gt) > 0) {
serve_synchronously = 1;
}
}
@@ -2128,9 +2141,13 @@ void xdelCommand(client *c) {
streamParseIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */
deleted += streamDeleteItem(s,&id);
}
- signalModifiedKey(c->db,c->argv[1]);
- notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
- server.dirty += deleted;
+
+ /* Propagate the write if needed. */
+ if (deleted) {
+ signalModifiedKey(c->db,c->argv[1]);
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
+ server.dirty += deleted;
+ }
addReplyLongLong(c,deleted);
}
@@ -2279,18 +2296,20 @@ NULL
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
- addReplyMultiBulkLen(c,6);
+ addReplyMultiBulkLen(c,8);
addReplyStatus(c,"name");
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyStatus(c,"consumers");
addReplyLongLong(c,raxSize(cg->consumers));
addReplyStatus(c,"pending");
addReplyLongLong(c,raxSize(cg->pel));
+ addReplyStatus(c,"last-delivered-id");
+ addReplyStreamID(c,&cg->last_id);
}
raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
/* XINFO STREAM <key> (or the alias XINFO <key>). */
- addReplyMultiBulkLen(c,12);
+ addReplyMultiBulkLen(c,14);
addReplyStatus(c,"length");
addReplyLongLong(c,s->length);
addReplyStatus(c,"radix-tree-keys");
@@ -2299,6 +2318,8 @@ NULL
addReplyLongLong(c,s->rax->numnodes);
addReplyStatus(c,"groups");
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
+ addReplyStatus(c,"last-generated-id");
+ addReplyStreamID(c,&s->last_id);
/* To emit the first/last entry we us the streamReplyWithRange()
* API. */
diff --git a/src/zmalloc.c b/src/zmalloc.c
index 4956f905f..0d9917510 100644
--- a/src/zmalloc.c
+++ b/src/zmalloc.c
@@ -182,6 +182,9 @@ size_t zmalloc_size(void *ptr) {
if (size&(sizeof(long)-1)) size += sizeof(long)-(size&(sizeof(long)-1));
return size+PREFIX_SIZE;
}
+size_t zmalloc_usable(void *ptr) {
+ return zmalloc_usable(ptr)-PREFIX_SIZE;
+}
#endif
void zfree(void *ptr) {
diff --git a/src/zmalloc.h b/src/zmalloc.h
index 49b33b883..9c9229907 100644
--- a/src/zmalloc.h
+++ b/src/zmalloc.h
@@ -98,6 +98,9 @@ void *zmalloc_no_tcache(size_t size);
#ifndef HAVE_MALLOC_SIZE
size_t zmalloc_size(void *ptr);
+size_t zmalloc_usable(void *ptr);
+#else
+#define zmalloc_usable(p) zmalloc_size(p)
#endif
#endif /* __ZMALLOC_H */
diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl
index 0c3f6b32c..e3cd11114 100644
--- a/tests/unit/maxmemory.tcl
+++ b/tests/unit/maxmemory.tcl
@@ -142,3 +142,95 @@ start_server {tags {"maxmemory"}} {
}
}
}
+
+proc test_slave_buffers {cmd_count payload_len limit_memory pipeline} {
+ start_server {tags {"maxmemory"}} {
+ start_server {} {
+ set slave [srv 0 client]
+ set slave_host [srv 0 host]
+ set slave_port [srv 0 port]
+ set master [srv -1 client]
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+
+ # add 100 keys of 100k (10MB total)
+ for {set j 0} {$j < 100} {incr j} {
+ $master setrange "key:$j" 100000 asdf
+ }
+
+ $master config set maxmemory-policy allkeys-random
+ $master config set client-output-buffer-limit "slave 100000000 100000000 60"
+ $master config set repl-backlog-size [expr {10*1024}]
+
+ $slave slaveof $master_host $master_port
+ wait_for_condition 50 100 {
+ [s 0 master_link_status] eq {up}
+ } else {
+ fail "Replication not started."
+ }
+
+ # measure used memory after the slave connected and set maxmemory
+ set orig_used [s -1 used_memory]
+ set orig_client_buf [s -1 mem_clients_normal]
+ set orig_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
+ set orig_used_no_repl [expr {$orig_used - $orig_mem_not_counted_for_evict}]
+ set limit [expr {$orig_used - $orig_mem_not_counted_for_evict + 20*1024}]
+
+ if {$limit_memory==1} {
+ $master config set maxmemory $limit
+ }
+
+ # put the slave to sleep
+ set rd_slave [redis_deferring_client]
+ $rd_slave debug sleep 60
+
+ # send some 10mb woth of commands that don't increase the memory usage
+ if {$pipeline == 1} {
+ set rd_master [redis_deferring_client -1]
+ for {set k 0} {$k < $cmd_count} {incr k} {
+ $rd_master setrange key:0 0 [string repeat A $payload_len]
+ }
+ for {set k 0} {$k < $cmd_count} {incr k} {
+ #$rd_master read
+ }
+ } else {
+ for {set k 0} {$k < $cmd_count} {incr k} {
+ $master setrange key:0 0 [string repeat A $payload_len]
+ }
+ }
+
+ set new_used [s -1 used_memory]
+ set slave_buf [s -1 mem_clients_slaves]
+ set client_buf [s -1 mem_clients_normal]
+ set mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
+ set used_no_repl [expr {$new_used - $mem_not_counted_for_evict}]
+ set delta [expr {($used_no_repl - $client_buf) - ($orig_used_no_repl - $orig_client_buf)}]
+
+ assert {[$master dbsize] == 100}
+ assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers
+ assert {$delta < 50*1024 && $delta > -50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB
+
+ $master client kill type slave
+ set killed_used [s -1 used_memory]
+ set killed_slave_buf [s -1 mem_clients_slaves]
+ set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
+ set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}]
+ set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}]
+ assert {$killed_slave_buf == 0}
+ assert {$delta_no_repl > -50*1024 && $delta_no_repl < 50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB
+ }
+ }
+}
+
+test {slave buffer are counted correctly} {
+ # we wanna use many small commands, and we don't wanna wait long
+ # so we need to use a pipeline (redis_deferring_client)
+ # that may cause query buffer to fill and induce eviction, so we disable it
+ test_slave_buffers 1000000 10 0 1
+}
+
+test {slave buffer don't induce eviction} {
+ # test again with fewer (and bigger) commands without pipeline, but with eviction
+ test_slave_buffers 100000 100 1 0
+}
+