diff options
author | Loïc Hoguin <lhoguin@vmware.com> | 2023-04-25 09:50:27 +0200 |
---|---|---|
committer | Loïc Hoguin <lhoguin@vmware.com> | 2023-04-25 09:50:27 +0200 |
commit | d3aa298bec0f1324e0b42d10338166d450bccb6e (patch) | |
tree | bd7cdf5bc41793419d4a829d276c44d2e3cafc6b | |
parent | c094bbb8288beb0b0184f9d7887cee396da21fff (diff) | |
download | rabbitmq-server-git-d3aa298bec0f1324e0b42d10338166d450bccb6e.tar.gz |
CQv1: Don't limit messages in memory based on consume rate
The v1 index is not optimised for reading messages except when
the entire segment is read. So we always do that.
This change was made because when the read is inefficient and
TTL is used the queue can get unresponsive while getting the
TTL messages dropped. In that case the queue may drop messages
slower than they expire and as a result will not process any
Erlang messages until it has dropped all messages in the queue.
-rw-r--r-- | deps/rabbit/src/rabbit_variable_queue.erl | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 9cf3af8716..90c68c1f7d 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -2601,14 +2601,22 @@ maybe_deltas_to_betas(DelsAndAcksFun, count = DeltaCount, transient = Transient, end_seq_id = DeltaSeqIdEnd } = Delta, + %% For v1 we always want to read messages up to the next segment boundary. + %% This is because v1 is not optimised for multiple reads from the same + %% segment: every time we read messages from a segment it has to read + %% and parse the entire segment from disk, filtering the messages we + %% requested afterwards. + %% + %% For v2 we want to limit the number of messages read at once to lower + %% the memory footprint. We use the consume rate to determine how many + %% messages we read. + DeltaSeqLimit = case Version of + 1 -> DeltaSeqIdEnd; + 2 -> DeltaSeqId + MemoryLimit + end, DeltaSeqId1 = lists:min([IndexMod:next_segment_boundary(DeltaSeqId), - %% We must limit the number of messages read at once - %% otherwise the queue will attempt to read up to segment_entry_count() - %% messages from the index each time. The value is determined - %% using the consuming rate. - DeltaSeqId + MemoryLimit, - DeltaSeqIdEnd]), + DeltaSeqLimit, DeltaSeqIdEnd]), {List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState), {List, StoreState2} = case Version of 1 -> {List0, StoreState}; |