summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/listpack.c22
-rw-r--r--src/listpack.h4
-rw-r--r--src/listpack_malloc.h1
-rw-r--r--src/t_stream.c25
4 files changed, 44 insertions, 8 deletions
diff --git a/src/listpack.c b/src/listpack.c
index b403a1200..a2255f0d7 100644
--- a/src/listpack.c
+++ b/src/listpack.c
@@ -219,9 +219,12 @@ int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) {
}
/* Create a new, empty listpack.
- * On success the new listpack is returned, otherwise an error is returned. */
-unsigned char *lpNew(void) {
- unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
+ * On success the new listpack is returned, otherwise an error is returned.
+ * Pre-allocate at least `capacity` bytes of memory,
+ * over-allocated memory can be shrinked by `lpShrinkToFit`.
+ * */
+unsigned char *lpNew(size_t capacity) {
+ unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
lpSetNumElements(lp,0);
@@ -234,6 +237,16 @@ void lpFree(unsigned char *lp) {
lp_free(lp);
}
+/* Shrink the memory to fit. */
+unsigned char* lpShrinkToFit(unsigned char *lp) {
+ size_t size = lpGetTotalBytes(lp);
+ if (size < lp_malloc_size(lp)) {
+ return lp_realloc(lp, size);
+ } else {
+ return lp;
+ }
+}
+
/* Given an element 'ele' of size 'size', determine if the element can be
* represented inside the listpack encoded as integer, and returns
* LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer
@@ -702,7 +715,8 @@ unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, un
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
/* Realloc before: we need more room. */
- if (new_listpack_bytes > old_listpack_bytes) {
+ if (new_listpack_bytes > old_listpack_bytes &&
+ new_listpack_bytes > lp_malloc_size(lp)) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
diff --git a/src/listpack.h b/src/listpack.h
index e8375628b..f87622c18 100644
--- a/src/listpack.h
+++ b/src/listpack.h
@@ -35,6 +35,7 @@
#ifndef __LISTPACK_H
#define __LISTPACK_H
+#include <stdlib.h>
#include <stdint.h>
#define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */
@@ -44,8 +45,9 @@
#define LP_AFTER 1
#define LP_REPLACE 2
-unsigned char *lpNew(void);
+unsigned char *lpNew(size_t capacity);
void lpFree(unsigned char *lp);
+unsigned char* lpShrinkToFit(unsigned char *lp);
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size);
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);
diff --git a/src/listpack_malloc.h b/src/listpack_malloc.h
index 401ab6f74..3a9050052 100644
--- a/src/listpack_malloc.h
+++ b/src/listpack_malloc.h
@@ -42,4 +42,5 @@
#define lp_malloc zmalloc
#define lp_realloc zrealloc
#define lp_free zfree
+#define lp_malloc_size zmalloc_usable_size
#endif
diff --git a/src/t_stream.c b/src/t_stream.c
index 7b4ffe3c4..da43fce18 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -43,6 +43,10 @@
* avoid malloc allocation.*/
#define STREAMID_STATIC_VECTOR_LEN 8
+/* Max pre-allocation for listpack. This is done to avoid abuse of a user
+ * setting stream_node_max_bytes to a huge number. */
+#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096
+
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
@@ -509,7 +513,13 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
lp = NULL;
} else if (server.stream_node_max_entries) {
int64_t count = lpGetInteger(lpFirst(lp));
- if (count >= server.stream_node_max_entries) lp = NULL;
+ if (count >= server.stream_node_max_entries) {
+ /* Shrink extra pre-allocated memory */
+ lp = lpShrinkToFit(lp);
+ if (ri.data != lp)
+ raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
+ lp = NULL;
+ }
}
}
@@ -517,8 +527,17 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
if (lp == NULL) {
master_id = id;
streamEncodeID(rax_key,&id);
- /* Create the listpack having the master entry ID and fields. */
- lp = lpNew();
+ /* Create the listpack having the master entry ID and fields.
+ * Pre-allocate some bytes when creating listpack to avoid realloc on
+ * every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
+ * and won't realloc on every XADD.
+ * When listpack reaches max number of entries, we'll shrink the
+ * allocation to fit the data. */
+ size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;
+ if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) {
+ prealloc = server.stream_node_max_bytes;
+ }
+ lp = lpNew(prealloc);
lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
lp = lpAppendInteger(lp,numfields);