diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-17 12:47:09 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-17 12:47:09 +0100 |
commit | bcc19963081e7abfcc205840b73fed4d5784ab1a (patch) | |
tree | ef5b4d926cd8fe4d46a46ef5597bdb6b47761d9f | |
parent | a97c0c40d238a1ac81d2f08c8ecd5bba1fb310b7 (diff) | |
download | rabbitmq-server-bcc19963081e7abfcc205840b73fed4d5784ab1a.tar.gz |
Having thought about and discussed the code and desires, change things around a bit.
-rw-r--r-- | src/rabbit_variable_queue.erl | 73 |
1 files changed, 26 insertions, 47 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 131d90cb..37e822da 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -136,24 +136,14 @@ %% less and gammas and betas more, which will cost more memory, but %% require fewer disk ops and less CPU overhead. %% -%% The decision taken is that once a gamma is at the head of q2, or at -%% the tail of q3, it'll be immediately rolled into delta. This -%% ensures that the memory use of a queue stays relatively flat, even -%% as it gets longer: we avoid a cliff face where an event forces -%% potentially millions of gammas to be suddenly rolled into -%% delta. This comes at the cost of needing to do additional reads -%% from queue_index as gammas are eventually brought out of delta. -%% -%% However, that does not yet explain when a beta is converted to a -%% gamma. In the case of a persistent msg published to a durable -%% queue, the msg is immediately written to the msg_store and -%% queue_index. If then additionally converted from an alpha, it'll -%% immediately go to a gamma (as it's already in queue_index), and -%% cannot exist as a beta. Thus a durable queue with a mixture of -%% persistent and transient msgs in it which has more messages than -%% permitted by the target_ram_count may contain an interspersed -%% mixture of betas and gammas in q2 and q3, but with a beta at the -%% head of q2 and the tail of q3. +%% In the case of a persistent msg published to a durable queue, the +%% msg is immediately written to the msg_store and queue_index. If +%% then additionally converted from an alpha, it'll immediately go to +%% a gamma (as it's already in queue_index), and cannot exist as a +%% beta. Thus a durable queue with a mixture of persistent and +%% transient msgs in it which has more messages than permitted by the +%% target_ram_count may contain an interspersed mixture of betas and +%% gammas in q2 and q3. %% %% There is then a ratio that controls how many betas and gammas there %% can be. This is based on the target_ram_count and thus expresses @@ -161,8 +151,8 @@ %% so should the number of betas and gammas fall (i.e. delta %% grows). If q2 and q3 contain more than the permitted number of %% betas and gammas, then the surplus are forcibly converted to gammas -%% (as necessary) and then rolled into delta. The ratio is that the -%% size of delta / (betas+gammas+delta) should equal +%% (as necessary) and then rolled into delta. The ratio is that +%% delta/(betas+gammas+delta) equals %% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as %% the target_ram_count shrinks to 0, so must betas and gammas. %% @@ -1489,15 +1479,19 @@ reduce_memory_use(State) -> State), State1. +permitted_beta_count(#vqstate { target_ram_count = 0, + q3 = Q3 }) -> + lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); permitted_beta_count(#vqstate { len = 0 }) -> infinity; -permitted_beta_count(#vqstate { target_ram_count = 0 }) -> - rabbit_queue_index:next_segment_boundary(0); -permitted_beta_count(#vqstate { target_ram_count = TargetRamCount, - len = Len }) -> - BetaDelta = lists:max([0, Len - TargetRamCount]), - lists:max([BetaDelta - ((BetaDelta * BetaDelta) div Len), - rabbit_queue_index:next_segment_boundary(0)]). +permitted_beta_count(#vqstate { q1 = Q1, + q4 = Q4, + target_ram_count = TargetRamCount, + len = Len }) -> + BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), + lists:max([rabbit_queue_index:next_segment_boundary(0), + BetaDelta - ((BetaDelta * BetaDelta) div + (BetaDelta + TargetRamCount))]). chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> @@ -1658,12 +1652,7 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Q, PushState}; - false -> {Q1, {Quota, Delta, IndexState}} = - push_betas_to_deltas1( - Generator, Limit, Q, PushState), - {Q2, Delta1} = - push_gammas_to_deltas(Generator, Limit, Q1, Delta), - {Q2, {Quota, Delta1, IndexState}} + false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) end end. @@ -1680,28 +1669,18 @@ push_betas_to_deltas1(Generator, Limit, Q, {Q, PushState}; {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk, seq_id = SeqId }}, Qa} -> - {Quota1, IndexState1} = + IndexState1 = case IndexOnDisk of - true -> {Quota, IndexState}; + true -> IndexState; false -> {#msg_status { index_on_disk = true }, IndexState2} = maybe_write_index_to_disk(true, MsgStatus, IndexState), - {Quota - 1, IndexState2} + IndexState2 end, Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota1, Delta1, IndexState1}) - end. - -push_gammas_to_deltas(Generator, Limit, Q, Delta) -> - case Generator(Q) of - {{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1} - when SeqId >= Limit -> - push_gammas_to_deltas(Generator, Limit, Q1, - expand_delta(SeqId, Delta)); - {_, _Q} -> - {Q, Delta} + {Quota - 1, Delta1, IndexState1}) end. %%---------------------------------------------------------------------------- |