diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-05 14:59:21 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-05 14:59:21 +0000 |
commit | 3ed9259814c38bc0b65317ab306740dacdea7c21 (patch) | |
tree | 42974bd2531a0b7eaf3110527ecd4a57bda5b052 | |
parent | 40f91556e3d9a673d4a1c34f9e46f98052b92b45 (diff) | |
download | rabbitmq-server-3ed9259814c38bc0b65317ab306740dacdea7c21.tar.gz |
Reworked ack shedding algorithm based on discussion with Matthew
-rw-r--r-- | src/rabbit_variable_queue.erl | 125 |
1 files changed, 71 insertions, 54 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b3f58ef9..9c25540e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -181,11 +181,18 @@ %% %% During memory reduction, acks stored in message form are converted %% to tuple form, and the corresponding messages are pushed out to -%% disk. Message form acks are always pushed to disk before messages -%% stored in the queue. More precisely, messages from the queue will -%% not be pushed out to disk while the number of messages form acks is -%% greater than zero. Message form acks are converted to tuple form in -%% batches of at most ?IO_BATCH_SIZE. +%% disk. +%% +%% The order in which alphas are pushed to betas and message form acks +%% are pushed to disk is determined dynamically. We always prefer to +%% push messages for the source (alphas or acks) that is growing the +%% fastest (with growth measured as avg. ingress - avg. egress). In +%% each round of memory reduction a chunk of messages at most +%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The +%% fastest growing source will be reduced by as much of this chunk as +%% possible. If there is any remaining allocation in the chunk after +%% the first source has been reduced to zero, the second source will +%% be reduced by as much of the remaining chunk as possible. %% %% Notes on Clean Shutdown %% (This documents behaviour in variable_queue, queue_index and @@ -779,10 +786,10 @@ ram_duration(State = #vqstate { ram_ack_count_prev = RamAckCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (State1) -> State1 end, - fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, State), Res; needs_idle_timeout(_State) -> @@ -1365,58 +1372,68 @@ find_persistent_count(LensByStore) -> %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> - {Reduce, State2} = case reduce_ack_memory_use(AckFun, State) of - {true, State1} -> - %% Don't want to reduce the number of - %% ram messages if we might yet be able - %% to reduce more acks. - {true, State1}; - {false, State1} -> - case chunk_size( - State1 #vqstate.ram_msg_count, - State1 #vqstate.target_ram_msg_count) of - 0 -> {false, State1}; - S1 -> {true, AlphaBetaFun(S1, State1)} - end - end, - - case State2 #vqstate.target_ram_msg_count of - infinity -> {Reduce, State2}; - 0 -> {Reduce, BetaDeltaFun(State2)}; - _ -> case chunk_size(State2 #vqstate.ram_index_count, - permitted_ram_index_count(State2)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State2)}; - _ -> {Reduce, State2} - end - end. - -reduce_ack_memory_use(_AckFun, - State = #vqstate { target_ram_msg_count = infinity }) -> +reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, + State = #vqstate {target_ram_msg_count = infinity}) -> {false, State}; -reduce_ack_memory_use(AckFun, - State = #vqstate { - target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_ack_index = RamAckIndex} ) -> - PermittedAckCount = case TargetRamMsgCount > RamMsgCount of - true -> TargetRamMsgCount - RamMsgCount; - false -> 0 - end, - case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of - 0 -> {false, State}; - C -> {true, AckFun(C, State)} - end. +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, + State = #vqstate { + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount, + rates = #rates { + avg_ingress = AvgIngress, + avg_egress = AvgEgress }, + ack_rates = #rates { + avg_ingress = AvgAckIngress, + avg_egress = AvgAckEgress } }) -> + + {Reduce, State1} = + case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + TargetRamMsgCount) of + 0 -> + {false, State}; + S1 -> + ReduceFuns = + case (AvgAckIngress - AvgAckEgress) > + (AvgIngress - AvgEgress) of + true -> + %% ACKs are growing faster than the queue, + %% push messages from there first. + [AckFun, AlphaBetaFun]; + false -> + %% The queue is growing faster than the + %% acks, push queue messages first. + [AlphaBetaFun, AckFun] + end, + {_, StateOut} = + %% Both reduce functions get a chance to reduce + %% memory. The second may very well get a quota of + %% 0 if the first function managed to push out the + %% maximum number of messages. + lists:foldl( + fun(ReduceFun, {QuotaN, StateN}) -> + ReduceFun(QuotaN, StateN) + end, {S1, State}, ReduceFuns), + {true, StateOut} + end, + case State1 #vqstate.target_ram_msg_count of + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end + end. limit_ram_acks(0, State) -> - State; + {0, State}; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> case gb_trees:is_empty(RAI) of true -> - State; + {Quota, State}; false -> {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), MsgStatus = #msg_status { @@ -1480,7 +1497,7 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). chunk_size(Current, Permitted) - when Permitted =:= infinity orelse Permitted >= Current -> + when Permitted >= Current -> 0; chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). @@ -1568,9 +1585,9 @@ maybe_deltas_to_betas(State = #vqstate { end. push_alphas_to_betas(Quota, State) -> - { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), - {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), - State2. + {Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + {Quota2, State2}. maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( |