summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-17 12:47:09 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-17 12:47:09 +0100
commitbcc19963081e7abfcc205840b73fed4d5784ab1a (patch)
treeef5b4d926cd8fe4d46a46ef5597bdb6b47761d9f
parenta97c0c40d238a1ac81d2f08c8ecd5bba1fb310b7 (diff)
downloadrabbitmq-server-bcc19963081e7abfcc205840b73fed4d5784ab1a.tar.gz
Having thought about and discussed the code and desires, change things around a bit.
-rw-r--r--src/rabbit_variable_queue.erl73
1 files changed, 26 insertions, 47 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 131d90cb..37e822da 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -136,24 +136,14 @@
%% less and gammas and betas more, which will cost more memory, but
%% require fewer disk ops and less CPU overhead.
%%
-%% The decision taken is that once a gamma is at the head of q2, or at
-%% the tail of q3, it'll be immediately rolled into delta. This
-%% ensures that the memory use of a queue stays relatively flat, even
-%% as it gets longer: we avoid a cliff face where an event forces
-%% potentially millions of gammas to be suddenly rolled into
-%% delta. This comes at the cost of needing to do additional reads
-%% from queue_index as gammas are eventually brought out of delta.
-%%
-%% However, that does not yet explain when a beta is converted to a
-%% gamma. In the case of a persistent msg published to a durable
-%% queue, the msg is immediately written to the msg_store and
-%% queue_index. If then additionally converted from an alpha, it'll
-%% immediately go to a gamma (as it's already in queue_index), and
-%% cannot exist as a beta. Thus a durable queue with a mixture of
-%% persistent and transient msgs in it which has more messages than
-%% permitted by the target_ram_count may contain an interspersed
-%% mixture of betas and gammas in q2 and q3, but with a beta at the
-%% head of q2 and the tail of q3.
+%% In the case of a persistent msg published to a durable queue, the
+%% msg is immediately written to the msg_store and queue_index. If
+%% then additionally converted from an alpha, it'll immediately go to
+%% a gamma (as it's already in queue_index), and cannot exist as a
+%% beta. Thus a durable queue with a mixture of persistent and
+%% transient msgs in it which has more messages than permitted by the
+%% target_ram_count may contain an interspersed mixture of betas and
+%% gammas in q2 and q3.
%%
%% There is then a ratio that controls how many betas and gammas there
%% can be. This is based on the target_ram_count and thus expresses
@@ -161,8 +151,8 @@
%% so should the number of betas and gammas fall (i.e. delta
%% grows). If q2 and q3 contain more than the permitted number of
%% betas and gammas, then the surplus are forcibly converted to gammas
-%% (as necessary) and then rolled into delta. The ratio is that the
-%% size of delta / (betas+gammas+delta) should equal
+%% (as necessary) and then rolled into delta. The ratio is that
+%% delta/(betas+gammas+delta) equals
%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
%% the target_ram_count shrinks to 0, so must betas and gammas.
%%
@@ -1489,15 +1479,19 @@ reduce_memory_use(State) ->
State),
State1.
+permitted_beta_count(#vqstate { target_ram_count = 0,
+ q3 = Q3 }) ->
+ lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
-permitted_beta_count(#vqstate { target_ram_count = 0 }) ->
- rabbit_queue_index:next_segment_boundary(0);
-permitted_beta_count(#vqstate { target_ram_count = TargetRamCount,
- len = Len }) ->
- BetaDelta = lists:max([0, Len - TargetRamCount]),
- lists:max([BetaDelta - ((BetaDelta * BetaDelta) div Len),
- rabbit_queue_index:next_segment_boundary(0)]).
+permitted_beta_count(#vqstate { q1 = Q1,
+ q4 = Q4,
+ target_ram_count = TargetRamCount,
+ len = Len }) ->
+ BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
+ lists:max([rabbit_queue_index:next_segment_boundary(0),
+ BetaDelta - ((BetaDelta * BetaDelta) div
+ (BetaDelta + TargetRamCount))]).
chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
@@ -1658,12 +1652,7 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Q, PushState};
- false -> {Q1, {Quota, Delta, IndexState}} =
- push_betas_to_deltas1(
- Generator, Limit, Q, PushState),
- {Q2, Delta1} =
- push_gammas_to_deltas(Generator, Limit, Q1, Delta),
- {Q2, {Quota, Delta1, IndexState}}
+ false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
end
end.
@@ -1680,28 +1669,18 @@ push_betas_to_deltas1(Generator, Limit, Q,
{Q, PushState};
{{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk,
seq_id = SeqId }}, Qa} ->
- {Quota1, IndexState1} =
+ IndexState1 =
case IndexOnDisk of
- true -> {Quota, IndexState};
+ true -> IndexState;
false -> {#msg_status { index_on_disk = true },
IndexState2} =
maybe_write_index_to_disk(true, MsgStatus,
IndexState),
- {Quota - 1, IndexState2}
+ IndexState2
end,
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota1, Delta1, IndexState1})
- end.
-
-push_gammas_to_deltas(Generator, Limit, Q, Delta) ->
- case Generator(Q) of
- {{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1}
- when SeqId >= Limit ->
- push_gammas_to_deltas(Generator, Limit, Q1,
- expand_delta(SeqId, Delta));
- {_, _Q} ->
- {Q, Delta}
+ {Quota - 1, Delta1, IndexState1})
end.
%%----------------------------------------------------------------------------