diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-11 19:33:43 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-11 19:33:43 +0000 |
commit | 77b9aa447953dbb760c8e0bc766b089a5bdc9e79 (patch) | |
tree | 903e8f35766a94c655ca47d9a58227bb45cecc0f | |
parent | 2885ee4b3f2c8b12415aa0a3808b04ba7c410576 (diff) | |
download | rabbitmq-server-77b9aa447953dbb760c8e0bc766b089a5bdc9e79.tar.gz |
Rename target_ram_msg_count to target_ram_item_count
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 225 |
2 files changed, 114 insertions, 113 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 4f543704..3130bca3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1892,7 +1892,7 @@ test_variable_queue_ack_limiting(VQ0) -> VQ6 = check_variable_queue_status( rabbit_variable_queue:set_ram_duration_target(0, VQ5), [{len, Len div 2}, - {target_ram_msg_count, 0}, + {target_ram_item_count, 0}, {ram_msg_count, 0}, {ram_ack_count, 0}]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cbeb9a75..833fc789 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -158,7 +158,7 @@ %% The conversion from alphas to betas is also chunked, but only to %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at %% any one time. This further smooths the effects of changes to the -%% target_ram_msg_count and ensures the queue remains responsive +%% target_ram_item_count and ensures the queue remains responsive %% even when there is a large amount of IO work to do. The %% idle_timeout callback is utilised to ensure that conversions are %% done as promptly as possible whilst ensuring the queue remains @@ -257,7 +257,7 @@ len, persistent_count, - target_ram_msg_count, + target_ram_item_count, ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, @@ -331,34 +331,34 @@ funs :: [fun (() -> any())] }). -type(state() :: #vqstate { - q1 :: queue(), - q2 :: bpqueue:bpqueue(), - delta :: delta(), - q3 :: bpqueue:bpqueue(), - q4 :: queue(), - next_seq_id :: seq_id(), - pending_ack :: dict:dictionary(), - ram_ack_index :: gb_tree(), - index_state :: any(), - msg_store_clients :: 'undefined' | {{any(), binary()}, + q1 :: queue(), + q2 :: bpqueue:bpqueue(), + delta :: delta(), + q3 :: bpqueue:bpqueue(), + q4 :: queue(), + next_seq_id :: seq_id(), + pending_ack :: dict:dictionary(), + ram_ack_index :: gb_tree(), + index_state :: any(), + msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - on_sync :: sync(), - durable :: boolean(), - - len :: non_neg_integer(), - persistent_count :: non_neg_integer(), - - transient_threshold :: non_neg_integer(), - target_ram_msg_count :: non_neg_integer() | 'infinity', - ram_msg_count :: non_neg_integer(), - ram_msg_count_prev :: non_neg_integer(), - ram_index_count :: non_neg_integer(), - out_counter :: non_neg_integer(), - in_counter :: non_neg_integer(), - ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer(), - rates :: rates(), - ack_rates :: rates() }). + on_sync :: sync(), + durable :: boolean(), + + len :: non_neg_integer(), + persistent_count :: non_neg_integer(), + + transient_threshold :: non_neg_integer(), + target_ram_item_count :: non_neg_integer() | 'infinity', + ram_msg_count :: non_neg_integer(), + ram_msg_count_prev :: non_neg_integer(), + ram_index_count :: non_neg_integer(), + out_counter :: non_neg_integer(), + in_counter :: non_neg_integer(), + ack_out_counter :: non_neg_integer(), + ack_in_counter :: non_neg_integer(), + rates :: rates(), + ack_rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). @@ -715,18 +715,18 @@ set_ram_duration_target(DurationTarget, ack_rates = #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate }, - target_ram_msg_count = TargetRamMsgCount }) -> + target_ram_item_count = TargetRamItemCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, - TargetRamMsgCount1 = + TargetRamItemCount1 = case DurationTarget of infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, - a(case TargetRamMsgCount1 == infinity orelse - (TargetRamMsgCount =/= infinity andalso - TargetRamMsgCount1 >= TargetRamMsgCount) of + State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 }, + a(case TargetRamItemCount1 == infinity orelse + (TargetRamItemCount =/= infinity andalso + TargetRamItemCount1 >= TargetRamItemCount) of true -> State1; false -> reduce_memory_use(State1) end). @@ -799,39 +799,39 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - pending_ack = PA, - ram_ack_index = RAI, - on_sync = #sync { funs = From }, - target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - next_seq_id = NextSeqId, - persistent_count = PersistentCount, - rates = #rates { + len = Len, + pending_ack = PA, + ram_ack_index = RAI, + on_sync = #sync { funs = From }, + target_ram_item_count = TargetRamItemCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + next_seq_id = NextSeqId, + persistent_count = PersistentCount, + rates = #rates { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate }, - ack_rates = #rates { + ack_rates = #rates { avg_egress = AvgAckEgressRate, avg_ingress = AvgAckIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , bpqueue:len(Q2)}, - {delta , Delta}, - {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, - {len , Len}, - {pending_acks , dict:size(PA)}, - {ram_ack_count , gb_trees:size(RAI)}, - {outstanding_txns , length(From)}, - {target_ram_msg_count , TargetRamMsgCount}, - {ram_msg_count , RamMsgCount}, - {ram_index_count , RamIndexCount}, - {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, - {avg_egress_rate , AvgEgressRate}, - {avg_ingress_rate , AvgIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate}, - {avg_ack_ingress_rate , AvgAckIngressRate}]. + [ {q1 , queue:len(Q1)}, + {q2 , bpqueue:len(Q2)}, + {delta , Delta}, + {q3 , bpqueue:len(Q3)}, + {q4 , queue:len(Q4)}, + {len , Len}, + {pending_acks , dict:size(PA)}, + {ram_ack_count , gb_trees:size(RAI)}, + {outstanding_txns , length(From)}, + {target_ram_item_count , TargetRamItemCount}, + {ram_msg_count , RamMsgCount}, + {ram_index_count , RamIndexCount}, + {next_seq_id , NextSeqId}, + {persistent_count , PersistentCount}, + {avg_egress_rate , AvgEgressRate}, + {avg_ingress_rate , AvgIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate}, + {avg_ack_ingress_rate , AvgAckIngressRate}]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -1021,42 +1021,42 @@ init(IsDurable, IndexState, DeltaCount, Terms, end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - ram_ack_index = gb_trees:empty(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - target_ram_msg_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_ack_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - ack_out_counter = 0, - ack_in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now }, - ack_rates = #rates { egress = {Now, 0}, - ingress = {Now, 0}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = undefined } }, + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + ram_ack_index = gb_trees:empty(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_item_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_ack_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + ack_out_counter = 0, + ack_in_counter = 0, + rates = #rates { egress = {Now, 0}, + ingress = {Now, DeltaCount1}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Now }, + ack_rates = #rates { egress = {Now, 0}, + ingress = {Now, 0}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = undefined } }, a(maybe_deltas_to_betas(State)). msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> @@ -1360,7 +1360,7 @@ find_persistent_count(LensByStore) -> %% though the conversion function for that is called as necessary. The %% reason is twofold. Firstly, this is safe because the conversion is %% only ever necessary just after a transition to a -%% target_ram_msg_count of zero or after an incremental alpha->beta +%% target_ram_item_count of zero or after an incremental alpha->beta %% conversion. In the former case the conversion is performed straight %% away (i.e. any betas present at the time are converted to deltas), %% and in the latter case the need for a conversion is flagged up @@ -1371,23 +1371,23 @@ find_persistent_count(LensByStore) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, - State = #vqstate {target_ram_msg_count = infinity}) -> + State = #vqstate {target_ram_item_count = infinity}) -> {false, State}; reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State = #vqstate { - ram_ack_index = RamAckIndex, - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount, - rates = #rates { + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_item_count = TargetRamItemCount, + rates = #rates { avg_ingress = AvgIngress, avg_egress = AvgEgress }, - ack_rates = #rates { + ack_rates = #rates { avg_ingress = AvgAckIngress, avg_egress = AvgAckEgress } }) -> {Reduce, State1} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), - TargetRamMsgCount) of + TargetRamItemCount) of 0 -> {false, State}; S1 -> @@ -1415,7 +1415,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, {true, StateOut} end, - case State1 #vqstate.target_ram_msg_count of + case State1 #vqstate.target_ram_item_count of 0 -> {Reduce, BetaDeltaFun(State1)}; _ -> case chunk_size(State1 #vqstate.ram_index_count, permitted_ram_index_count(State1)) of @@ -1612,10 +1612,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) + ram_msg_count = RamMsgCount, + target_ram_item_count = TargetRamItemCount }) when Quota =:= 0 orelse - TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> + TargetRamItemCount =:= infinity orelse + TargetRamItemCount >= RamMsgCount -> {Quota, State}; maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of |