summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
authorMadelyn Olson <34459052+madolson@users.noreply.github.com>2023-03-07 15:06:53 -0800
committerGitHub <noreply@github.com>2023-03-07 15:06:53 -0800
commit2bb29e4aa375a76e81dfdcf6bbe418f0bf3461d1 (patch)
treedb246948c78e329fa8f518e6b1431a811fa82c43 /src/t_stream.c
parent9958ab8b2cd17681962f5de83e6a94cf4189e9f3 (diff)
downloadredis-2bb29e4aa375a76e81dfdcf6bbe418f0bf3461d1.tar.gz
Always compact nodes in stream listpacks after creating new nodes (#11885)
This change attempts to alleviate a minor memory usage degradation for Redis 6.2 and onwards when using rather large objects (~2k) in streams. Introduced in #6281, we pre-allocate the head nodes of a stream to be 4kb, to limit the amount of unnecessary initial reallocations that are done. However, if we only ever allocate one object because 2 objects exceeds the max_stream_entry_size, we never actually shrink it to fit the single item. This can lead to a lot of excessive memory usage. For smaller item sizes this becomes less of an issue, as the overhead decreases as the items become smaller in size. This commit also changes the MEMORY USAGE of streams, since it was reporting the lpBytes instead of the allocated size. This introduced an observability issue when diagnosing the memory issue, since Redis reported the same amount of used bytes pre and post change, even though the new implementation allocated more memory.
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c19
1 files changed, 11 insertions, 8 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 16ad044a2..398f8ba0b 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -530,22 +530,25 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
* if we need to switch to the next one. 'lp' will be set to NULL if
* the current node is full. */
if (lp != NULL) {
+ int new_node = 0;
size_t node_max_bytes = server.stream_node_max_bytes;
if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
if (lp_bytes + totelelen >= node_max_bytes) {
- lp = NULL;
+ new_node = 1;
} else if (server.stream_node_max_entries) {
unsigned char *lp_ele = lpFirst(lp);
/* Count both live entries and deleted ones. */
int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele));
- if (count >= server.stream_node_max_entries) {
- /* Shrink extra pre-allocated memory */
- lp = lpShrinkToFit(lp);
- if (ri.data != lp)
- raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
- lp = NULL;
- }
+ if (count >= server.stream_node_max_entries) new_node = 1;
+ }
+
+ if (new_node) {
+ /* Shrink extra pre-allocated memory */
+ lp = lpShrinkToFit(lp);
+ if (ri.data != lp)
+ raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
+ lp = NULL;
}
}