diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-12-05 19:31:22 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-12-05 19:31:22 +0000 |
commit | 37ee768e978377a87bc5d17d072d51adc97547d7 (patch) | |
tree | 4c57f06b03aed53aacd7fedccd5bc1b409f43411 | |
parent | 4551f44b18a848e1dc3d543e5ccc3ca1c9dc2932 (diff) | |
download | rabbitmq-server-37ee768e978377a87bc5d17d072d51adc97547d7.tar.gz |
cosmetic
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 194 |
2 files changed, 96 insertions, 100 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8b58b822..b9edad9a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1898,7 +1898,7 @@ test_variable_queue_ack_limiting(VQ0) -> VQ6 = check_variable_queue_status( rabbit_variable_queue:set_ram_duration_target(0, VQ5), [{len, Len div 2}, - {target_ram_item_count, 0}, + {target_ram_count, 0}, {ram_msg_count, 0}, {ram_ack_count, 0}]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ce80bc92..225ded3c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -158,7 +158,7 @@ %% The conversion from alphas to betas is also chunked, but only to %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at %% any one time. This further smooths the effects of changes to the -%% target_ram_item_count and ensures the queue remains responsive +%% target_ram_count and ensures the queue remains responsive %% even when there is a large amount of IO work to do. The %% idle_timeout callback is utilised to ensure that conversions are %% done as promptly as possible whilst ensuring the queue remains @@ -256,7 +256,7 @@ len, persistent_count, - target_ram_item_count, + target_ram_count, ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, @@ -351,7 +351,7 @@ persistent_count :: non_neg_integer(), transient_threshold :: non_neg_integer(), - target_ram_item_count :: non_neg_integer() | 'infinity', + target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), ram_index_count :: non_neg_integer(), @@ -734,26 +734,24 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -set_ram_duration_target(DurationTarget, - State = #vqstate { - rates = - #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = - #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate }, - target_ram_item_count = TargetRamItemCount }) -> +set_ram_duration_target( + DurationTarget, State = #vqstate { + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + ack_rates = #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, + target_ram_count = TargetRamCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, - TargetRamItemCount1 = + TargetRamCount1 = case DurationTarget of infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 }, - a(case TargetRamItemCount1 == infinity orelse - (TargetRamItemCount =/= infinity andalso - TargetRamItemCount1 >= TargetRamItemCount) of + State1 = State #vqstate { target_ram_count = TargetRamCount1 }, + a(case TargetRamCount1 == infinity orelse + (TargetRamCount =/= infinity andalso + TargetRamCount1 >= TargetRamCount) of true -> State1; false -> reduce_memory_use(State1) end). @@ -829,40 +827,39 @@ idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - pending_ack = PA, - ram_ack_index = RAI, - on_sync = #sync { funs = From }, - target_ram_item_count = TargetRamItemCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - next_seq_id = NextSeqId, - persistent_count = PersistentCount, - rates = #rates { - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { - avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , bpqueue:len(Q2)}, - {delta , Delta}, - {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, - {len , Len}, - {pending_acks , dict:size(PA)}, - {outstanding_txns , length(From)}, - {target_ram_item_count , TargetRamItemCount}, - {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RAI)}, - {ram_index_count , RamIndexCount}, - {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, - {avg_ingress_rate , AvgIngressRate}, - {avg_egress_rate , AvgEgressRate}, - {avg_ack_ingress_rate , AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]. +status(#vqstate { + q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + len = Len, + pending_ack = PA, + ram_ack_index = RAI, + on_sync = #sync { funs = From }, + target_ram_count = TargetRamCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + next_seq_id = NextSeqId, + persistent_count = PersistentCount, + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + ack_rates = #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate } }) -> + [ {q1 , queue:len(Q1)}, + {q2 , bpqueue:len(Q2)}, + {delta , Delta}, + {q3 , bpqueue:len(Q3)}, + {q4 , queue:len(Q4)}, + {len , Len}, + {pending_acks , dict:size(PA)}, + {outstanding_txns , length(From)}, + {target_ram_count , TargetRamCount}, + {ram_msg_count , RamMsgCount}, + {ram_ack_count , gb_trees:size(RAI)}, + {ram_index_count , RamIndexCount}, + {next_seq_id , NextSeqId}, + {persistent_count , PersistentCount}, + {avg_ingress_rate , AvgIngressRate}, + {avg_egress_rate , AvgEgressRate}, + {avg_ack_ingress_rate, AvgAckIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate} ]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -1056,37 +1053,37 @@ init(IsDurable, IndexState, DeltaCount, Terms, end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - ram_ack_index = gb_trees:empty(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - target_ram_item_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_ack_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - msgs_on_disk = gb_sets:new(), - msg_indices_on_disk = gb_sets:new(), - unconfirmed = gb_sets:new(), - ack_out_counter = 0, - ack_in_counter = 0, - rates = blank_rate(Now, DeltaCount1), - ack_rates = blank_rate(Now, 0) }, + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + ram_ack_index = gb_trees:empty(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_ack_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + rates = blank_rate(Now, DeltaCount1), + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new(), + unconfirmed = gb_sets:new(), + ack_out_counter = 0, + ack_in_counter = 0, + ack_rates = blank_rate(Now, 0) }, a(maybe_deltas_to_betas(State)). blank_rate(Timestamp, IngressLength) -> @@ -1443,7 +1440,7 @@ msg_indices_written_to_disk(QPid, GuidSet) -> %% though the conversion function for that is called as necessary. The %% reason is twofold. Firstly, this is safe because the conversion is %% only ever necessary just after a transition to a -%% target_ram_item_count of zero or after an incremental alpha->beta +%% target_ram_count of zero or after an incremental alpha->beta %% conversion. In the former case the conversion is performed straight %% away (i.e. any betas present at the time are converted to deltas), %% and in the latter case the need for a conversion is flagged up @@ -1454,23 +1451,22 @@ msg_indices_written_to_disk(QPid, GuidSet) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, - State = #vqstate {target_ram_item_count = infinity}) -> + State = #vqstate {target_ram_count = infinity}) -> {false, State}; reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State = #vqstate { - ram_ack_index = RamAckIndex, - ram_msg_count = RamMsgCount, - target_ram_item_count = TargetRamItemCount, - rates = #rates { - avg_ingress = AvgIngress, - avg_egress = AvgEgress }, - ack_rates = #rates { - avg_ingress = AvgAckIngress, - avg_egress = AvgAckEgress } }) -> + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount, + rates = #rates { avg_ingress = AvgIngress, + avg_egress = AvgEgress }, + ack_rates = #rates { avg_ingress = AvgAckIngress, + avg_egress = AvgAckEgress } + }) -> {Reduce, State1} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), - TargetRamItemCount) of + TargetRamCount) of 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is %% determined based on which is growing faster. Whichever @@ -1489,7 +1485,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, {true, State2} end, - case State1 #vqstate.target_ram_item_count of + case State1 #vqstate.target_ram_count of 0 -> {Reduce, BetaDeltaFun(State1)}; _ -> case chunk_size(State1 #vqstate.ram_index_count, permitted_ram_index_count(State1)) of @@ -1685,11 +1681,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_item_count = TargetRamItemCount }) + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount }) when Quota =:= 0 orelse - TargetRamItemCount =:= infinity orelse - TargetRamItemCount >= RamMsgCount -> + TargetRamCount =:= infinity orelse + TargetRamCount >= RamMsgCount -> {Quota, State}; maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of |