diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-18 13:21:26 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-18 13:21:26 +0000 |
commit | ef3a945414a2df5779e183f4c7d599e708ff107f (patch) | |
tree | 45242a676ec6970b33ef9ac81b160694b0a9e111 | |
parent | 18acdc0126fa7e36c1611e424d551ba39e70f139 (diff) | |
download | rabbitmq-server-ef3a945414a2df5779e183f4c7d599e708ff107f.tar.gz |
Minor refactorings, and a couple of fixes
-rw-r--r-- | src/rabbit_tests.erl | 10 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 96 |
2 files changed, 54 insertions, 52 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3130bca3..27e4d925 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1870,7 +1870,7 @@ test_variable_queue() -> passed. test_variable_queue_ack_limiting(VQ0) -> - %% start by sending in a bunch of messages < + %% start by sending in a bunch of messages Len = 1024, VQ1 = variable_queue_publish(false, Len, VQ0), @@ -1884,11 +1884,11 @@ test_variable_queue_ack_limiting(VQ0) -> %% fetch half the messages {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), - VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, - {ram_ack_count , Len div 2}, - {ram_msg_count , Len div 2}]), + VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, + {ram_ack_count, Len div 2}, + {ram_msg_count, Len div 2}]), - %% quarter the allowed duration + %% ensure all acks go to disk on 0 duration target VQ6 = check_variable_queue_status( rabbit_variable_queue:set_ram_duration_target(0, VQ5), [{len, Len div 2}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 48f621e1..e9910a56 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -171,19 +171,18 @@ %% segments. %% %% Pending acks are recorded in memory either as the tuple {SeqId, -%% Guid, MsgProps} (tuple form) or as the message itself (message -%% form). Acks for persistent messages are always stored in the tuple -%% form. Acks for transient messages are also stored in tuple form if -%% the message has been forgotten to disk as part of the memory -%% reduction process. For transient messages that haven't already been -%% written to disk, acks are stored in message form to avoid the -%% overhead of writing to disk. +%% Guid, MsgProps} (tuple-form) or as the message itself (message- +%% form). Acks for persistent messages are always stored in the tuple- +%% form. Acks for transient messages are also stored in tuple-form if +%% the message has been sent to disk as part of the memory reduction +%% process. For transient messages that haven't already been written +%% to disk, acks are stored in message-form. %% -%% During memory reduction, acks stored in message form are converted -%% to tuple form, and the corresponding messages are pushed out to +%% During memory reduction, acks stored in message-form are converted +%% to tuple-form, and the corresponding messages are pushed out to %% disk. %% -%% The order in which alphas are pushed to betas and message form acks +%% The order in which alphas are pushed to betas and message-form acks %% are pushed to disk is determined dynamically. We always prefer to %% push messages for the source (alphas or acks) that is growing the %% fastest (with growth measured as avg. ingress - avg. egress). In @@ -710,14 +709,15 @@ is_empty(State) -> 0 == len(State). set_ram_duration_target(DurationTarget, State = #vqstate { - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, + rates = + #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, ack_rates = #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate }, target_ram_item_count = TargetRamItemCount }) -> - Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate - + AvgAckIngressRate, + Rate = + AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, TargetRamItemCount1 = case DurationTarget of infinity -> infinity; @@ -732,10 +732,11 @@ set_ram_duration_target(DurationTarget, end). ram_duration(State = #vqstate { - rates = #rates { egress = Egress, - ingress = Ingress, - timestamp = Timestamp } = Rates, - ack_rates = #rates { egress = AckEgress, + rates = #rates { timestamp = Timestamp, + egress = Egress, + ingress = Ingress } = Rates, + ack_rates = #rates { timestamp = AckTimestamp, + egress = AckEgress, ingress = AckIngress } = ARates, in_counter = InCount, out_counter = OutCount, @@ -749,10 +750,10 @@ ram_duration(State = #vqstate { {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - {AvgAckEgressRate, AckEgress1} = - update_rate(Now, Timestamp, AckOutCount, AckEgress), + {AvgAckEgressRate, AckEgress1} = + update_rate(Now, AckTimestamp, AckOutCount, AckEgress), {AvgAckIngressRate, AckIngress1} = - update_rate(Now, Timestamp, AckInCount, AckIngress), + update_rate(Now, AckTimestamp, AckInCount, AckIngress), RamAckCount = gb_trees:size(RamAckIndex), @@ -762,7 +763,7 @@ ram_duration(State = #vqstate { true -> infinity; false -> (RamMsgCountPrev + RamMsgCount + RamAckCount + RamAckCountPrev) / - (2 * (AvgEgressRate + AvgIngressRate + + (4 * (AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate)) end, @@ -774,12 +775,15 @@ ram_duration(State = #vqstate { avg_ingress = AvgIngressRate, timestamp = Now }, ack_rates = ARates #rates { - egress = AckEgress1, - ingress = AckIngress1, - avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate }, + egress = AckEgress1, + ingress = AckIngress1, + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate, + timestamp = Now }, in_counter = 0, out_counter = 0, + ack_in_counter = 0, + ack_out_counter = 0, ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. @@ -1047,18 +1051,17 @@ init(IsDurable, IndexState, DeltaCount, Terms, in_counter = 0, ack_out_counter = 0, ack_in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now }, - ack_rates = #rates { egress = {Now, 0}, - ingress = {Now, 0}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = undefined } }, + rates = blank_rate(Now, DeltaCount1), + ack_rates = blank_rate(Now, 0) }, a(maybe_deltas_to_betas(State)). +blank_rate(Timestamp, IngressLength) -> + #rates { egress = {Timestamp, 0}, + ingress = {Timestamp, IngressLength}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Timestamp }. + msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( @@ -1315,7 +1318,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, - ram_ack_index = RAI}}) -> + ram_ack_index = RAI }}) -> AckEntry = dict:fetch(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { @@ -1403,28 +1406,27 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, %% acks, push queue messages first. [AlphaBetaFun, AckFun] end, - {_, StateOut} = + {_, State2} = %% Both reduce functions get a chance to reduce %% memory. The second may very well get a quota of %% 0 if the first function managed to push out the %% maximum number of messages. lists:foldl( - fun(ReduceFun, {QuotaN, StateN}) -> + fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) end, {S1, State}, ReduceFuns), - {true, StateOut} + {true, State2} end, case State1 #vqstate.target_ram_item_count of - 0 -> {Reduce, BetaDeltaFun(State1)}; - _ -> case chunk_size(State1 #vqstate.ram_index_count, - permitted_ram_index_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; - _ -> {Reduce, State1} - end + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end end. - limit_ram_acks(0, State) -> {0, State}; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, |