summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-05 14:59:21 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-05 14:59:21 +0000
commit3ed9259814c38bc0b65317ab306740dacdea7c21 (patch)
tree42974bd2531a0b7eaf3110527ecd4a57bda5b052
parent40f91556e3d9a673d4a1c34f9e46f98052b92b45 (diff)
downloadrabbitmq-server-3ed9259814c38bc0b65317ab306740dacdea7c21.tar.gz
Reworked ack shedding algorithm based on discussion with Matthew
-rw-r--r--src/rabbit_variable_queue.erl125
1 files changed, 71 insertions, 54 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b3f58ef9..9c25540e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -181,11 +181,18 @@
%%
%% During memory reduction, acks stored in message form are converted
%% to tuple form, and the corresponding messages are pushed out to
-%% disk. Message form acks are always pushed to disk before messages
-%% stored in the queue. More precisely, messages from the queue will
-%% not be pushed out to disk while the number of messages form acks is
-%% greater than zero. Message form acks are converted to tuple form in
-%% batches of at most ?IO_BATCH_SIZE.
+%% disk.
+%%
+%% The order in which alphas are pushed to betas and message form acks
+%% are pushed to disk is determined dynamically. We always prefer to
+%% push messages for the source (alphas or acks) that is growing the
+%% fastest (with growth measured as avg. ingress - avg. egress). In
+%% each round of memory reduction a chunk of messages at most
+%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The
+%% fastest growing source will be reduced by as much of this chunk as
+%% possible. If there is any remaining allocation in the chunk after
+%% the first source has been reduced to zero, the second source will
+%% be reduced by as much of the remaining chunk as possible.
%%
%% Notes on Clean Shutdown
%% (This documents behaviour in variable_queue, queue_index and
@@ -779,10 +786,10 @@ ram_duration(State = #vqstate {
ram_ack_count_prev = RamAckCount }}.
needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
+ {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
fun (State1) -> State1 end,
- fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
State),
Res;
needs_idle_timeout(_State) ->
@@ -1365,58 +1372,68 @@ find_persistent_count(LensByStore) ->
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) ->
- {Reduce, State2} = case reduce_ack_memory_use(AckFun, State) of
- {true, State1} ->
- %% Don't want to reduce the number of
- %% ram messages if we might yet be able
- %% to reduce more acks.
- {true, State1};
- {false, State1} ->
- case chunk_size(
- State1 #vqstate.ram_msg_count,
- State1 #vqstate.target_ram_msg_count) of
- 0 -> {false, State1};
- S1 -> {true, AlphaBetaFun(S1, State1)}
- end
- end,
-
- case State2 #vqstate.target_ram_msg_count of
- infinity -> {Reduce, State2};
- 0 -> {Reduce, BetaDeltaFun(State2)};
- _ -> case chunk_size(State2 #vqstate.ram_index_count,
- permitted_ram_index_count(State2)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State2)};
- _ -> {Reduce, State2}
- end
- end.
-
-reduce_ack_memory_use(_AckFun,
- State = #vqstate { target_ram_msg_count = infinity }) ->
+reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
+ State = #vqstate {target_ram_msg_count = infinity}) ->
{false, State};
-reduce_ack_memory_use(AckFun,
- State = #vqstate {
- target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount,
- ram_ack_index = RamAckIndex} ) ->
- PermittedAckCount = case TargetRamMsgCount > RamMsgCount of
- true -> TargetRamMsgCount - RamMsgCount;
- false -> 0
- end,
- case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of
- 0 -> {false, State};
- C -> {true, AckFun(C, State)}
- end.
+reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
+ State = #vqstate {
+ ram_ack_index = RamAckIndex,
+ ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount,
+ rates = #rates {
+ avg_ingress = AvgIngress,
+ avg_egress = AvgEgress },
+ ack_rates = #rates {
+ avg_ingress = AvgAckIngress,
+ avg_egress = AvgAckEgress } }) ->
+
+ {Reduce, State1} =
+ case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
+ TargetRamMsgCount) of
+ 0 ->
+ {false, State};
+ S1 ->
+ ReduceFuns =
+ case (AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress) of
+ true ->
+ %% ACKs are growing faster than the queue,
+ %% push messages from there first.
+ [AckFun, AlphaBetaFun];
+ false ->
+ %% The queue is growing faster than the
+ %% acks, push queue messages first.
+ [AlphaBetaFun, AckFun]
+ end,
+ {_, StateOut} =
+ %% Both reduce functions get a chance to reduce
+ %% memory. The second may very well get a quota of
+ %% 0 if the first function managed to push out the
+ %% maximum number of messages.
+ lists:foldl(
+ fun(ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end, {S1, State}, ReduceFuns),
+ {true, StateOut}
+ end,
+ case State1 #vqstate.target_ram_msg_count of
+ 0 -> {Reduce, BetaDeltaFun(State1)};
+ _ -> case chunk_size(State1 #vqstate.ram_index_count,
+ permitted_ram_index_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end
+ end.
limit_ram_acks(0, State) ->
- State;
+ {0, State};
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
case gb_trees:is_empty(RAI) of
true ->
- State;
+ {Quota, State};
false ->
{SeqId, Guid, RAI1} = gb_trees:take_largest(RAI),
MsgStatus = #msg_status {
@@ -1480,7 +1497,7 @@ permitted_ram_index_count(#vqstate { len = Len,
BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
chunk_size(Current, Permitted)
- when Permitted =:= infinity orelse Permitted >= Current ->
+ when Permitted >= Current ->
0;
chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
@@ -1568,9 +1585,9 @@ maybe_deltas_to_betas(State = #vqstate {
end.
push_alphas_to_betas(Quota, State) ->
- { Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
- {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
- State2.
+ {Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
+ {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
+ {Quota2, State2}.
maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(