summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoïc Hoguin <lhoguin@vmware.com>2023-04-25 09:50:27 +0200
committerLoïc Hoguin <lhoguin@vmware.com>2023-04-25 09:50:27 +0200
commitd3aa298bec0f1324e0b42d10338166d450bccb6e (patch)
treebd7cdf5bc41793419d4a829d276c44d2e3cafc6b
parentc094bbb8288beb0b0184f9d7887cee396da21fff (diff)
downloadrabbitmq-server-git-d3aa298bec0f1324e0b42d10338166d450bccb6e.tar.gz
CQv1: Don't limit messages in memory based on consume rate
The v1 index is not optimised for reading messages except when the entire segment is read. So we always do that. This change was made because when the read is inefficient and TTL is used the queue can get unresponsive while getting the TTL messages dropped. In that case the queue may drop messages slower than they expire and as a result will not process any Erlang messages until it has dropped all messages in the queue.
-rw-r--r--deps/rabbit/src/rabbit_variable_queue.erl20
1 files changed, 14 insertions, 6 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl
index 9cf3af8716..90c68c1f7d 100644
--- a/deps/rabbit/src/rabbit_variable_queue.erl
+++ b/deps/rabbit/src/rabbit_variable_queue.erl
@@ -2601,14 +2601,22 @@ maybe_deltas_to_betas(DelsAndAcksFun,
count = DeltaCount,
transient = Transient,
end_seq_id = DeltaSeqIdEnd } = Delta,
+ %% For v1 we always want to read messages up to the next segment boundary.
+ %% This is because v1 is not optimised for multiple reads from the same
+ %% segment: every time we read messages from a segment it has to read
+ %% and parse the entire segment from disk, filtering the messages we
+ %% requested afterwards.
+ %%
+ %% For v2 we want to limit the number of messages read at once to lower
+ %% the memory footprint. We use the consume rate to determine how many
+ %% messages we read.
+ DeltaSeqLimit = case Version of
+ 1 -> DeltaSeqIdEnd;
+ 2 -> DeltaSeqId + MemoryLimit
+ end,
DeltaSeqId1 =
lists:min([IndexMod:next_segment_boundary(DeltaSeqId),
- %% We must limit the number of messages read at once
- %% otherwise the queue will attempt to read up to segment_entry_count()
- %% messages from the index each time. The value is determined
- %% using the consuming rate.
- DeltaSeqId + MemoryLimit,
- DeltaSeqIdEnd]),
+ DeltaSeqLimit, DeltaSeqIdEnd]),
{List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState),
{List, StoreState2} = case Version of
1 -> {List0, StoreState};