summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-12-05 19:31:22 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2010-12-05 19:31:22 +0000
commit37ee768e978377a87bc5d17d072d51adc97547d7 (patch)
tree4c57f06b03aed53aacd7fedccd5bc1b409f43411
parent4551f44b18a848e1dc3d543e5ccc3ca1c9dc2932 (diff)
downloadrabbitmq-server-37ee768e978377a87bc5d17d072d51adc97547d7.tar.gz
cosmetic
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl194
2 files changed, 96 insertions, 100 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8b58b822..b9edad9a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1898,7 +1898,7 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6 = check_variable_queue_status(
rabbit_variable_queue:set_ram_duration_target(0, VQ5),
[{len, Len div 2},
- {target_ram_item_count, 0},
+ {target_ram_count, 0},
{ram_msg_count, 0},
{ram_ack_count, 0}]),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ce80bc92..225ded3c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -158,7 +158,7 @@
%% The conversion from alphas to betas is also chunked, but only to
%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
%% any one time. This further smooths the effects of changes to the
-%% target_ram_item_count and ensures the queue remains responsive
+%% target_ram_count and ensures the queue remains responsive
%% even when there is a large amount of IO work to do. The
%% idle_timeout callback is utilised to ensure that conversions are
%% done as promptly as possible whilst ensuring the queue remains
@@ -256,7 +256,7 @@
len,
persistent_count,
- target_ram_item_count,
+ target_ram_count,
ram_msg_count,
ram_msg_count_prev,
ram_ack_count_prev,
@@ -351,7 +351,7 @@
persistent_count :: non_neg_integer(),
transient_threshold :: non_neg_integer(),
- target_ram_item_count :: non_neg_integer() | 'infinity',
+ target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
ram_index_count :: non_neg_integer(),
@@ -734,26 +734,24 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-set_ram_duration_target(DurationTarget,
- State = #vqstate {
- rates =
- #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates =
- #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
- target_ram_item_count = TargetRamItemCount }) ->
+set_ram_duration_target(
+ DurationTarget, State = #vqstate {
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
+ target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
- TargetRamItemCount1 =
+ TargetRamCount1 =
case DurationTarget of
infinity -> infinity;
_ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
- State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 },
- a(case TargetRamItemCount1 == infinity orelse
- (TargetRamItemCount =/= infinity andalso
- TargetRamItemCount1 >= TargetRamItemCount) of
+ State1 = State #vqstate { target_ram_count = TargetRamCount1 },
+ a(case TargetRamCount1 == infinity orelse
+ (TargetRamCount =/= infinity andalso
+ TargetRamCount1 >= TargetRamCount) of
true -> State1;
false -> reduce_memory_use(State1)
end).
@@ -829,40 +827,39 @@ idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- pending_ack = PA,
- ram_ack_index = RAI,
- on_sync = #sync { funs = From },
- target_ram_item_count = TargetRamItemCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
- rates = #rates {
- avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates {
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate } }) ->
- [ {q1 , queue:len(Q1)},
- {q2 , bpqueue:len(Q2)},
- {delta , Delta},
- {q3 , bpqueue:len(Q3)},
- {q4 , queue:len(Q4)},
- {len , Len},
- {pending_acks , dict:size(PA)},
- {outstanding_txns , length(From)},
- {target_ram_item_count , TargetRamItemCount},
- {ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
- {ram_index_count , RamIndexCount},
- {next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
- {avg_ingress_rate , AvgIngressRate},
- {avg_egress_rate , AvgEgressRate},
- {avg_ack_ingress_rate , AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ].
+status(#vqstate {
+ q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ len = Len,
+ pending_ack = PA,
+ ram_ack_index = RAI,
+ on_sync = #sync { funs = From },
+ target_ram_count = TargetRamCount,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ next_seq_id = NextSeqId,
+ persistent_count = PersistentCount,
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate } }) ->
+ [ {q1 , queue:len(Q1)},
+ {q2 , bpqueue:len(Q2)},
+ {delta , Delta},
+ {q3 , bpqueue:len(Q3)},
+ {q4 , queue:len(Q4)},
+ {len , Len},
+ {pending_acks , dict:size(PA)},
+ {outstanding_txns , length(From)},
+ {target_ram_count , TargetRamCount},
+ {ram_msg_count , RamMsgCount},
+ {ram_ack_count , gb_trees:size(RAI)},
+ {ram_index_count , RamIndexCount},
+ {next_seq_id , NextSeqId},
+ {persistent_count , PersistentCount},
+ {avg_ingress_rate , AvgIngressRate},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ack_ingress_rate, AvgAckIngressRate},
+ {avg_ack_egress_rate , AvgAckEgressRate} ].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -1056,37 +1053,37 @@ init(IsDurable, IndexState, DeltaCount, Terms,
end,
Now = now(),
State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
- delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
- next_seq_id = NextSeqId,
- pending_ack = dict:new(),
- ram_ack_index = gb_trees:empty(),
- index_state = IndexState1,
- msg_store_clients = {PersistentClient, TransientClient},
- on_sync = ?BLANK_SYNC,
- durable = IsDurable,
- transient_threshold = NextSeqId,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
-
- target_ram_item_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_ack_count_prev = 0,
- ram_index_count = 0,
- out_counter = 0,
- in_counter = 0,
- msgs_on_disk = gb_sets:new(),
- msg_indices_on_disk = gb_sets:new(),
- unconfirmed = gb_sets:new(),
- ack_out_counter = 0,
- ack_in_counter = 0,
- rates = blank_rate(Now, DeltaCount1),
- ack_rates = blank_rate(Now, 0) },
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ next_seq_id = NextSeqId,
+ pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ on_sync = ?BLANK_SYNC,
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+
+ target_ram_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_ack_count_prev = 0,
+ ram_index_count = 0,
+ out_counter = 0,
+ in_counter = 0,
+ rates = blank_rate(Now, DeltaCount1),
+ msgs_on_disk = gb_sets:new(),
+ msg_indices_on_disk = gb_sets:new(),
+ unconfirmed = gb_sets:new(),
+ ack_out_counter = 0,
+ ack_in_counter = 0,
+ ack_rates = blank_rate(Now, 0) },
a(maybe_deltas_to_betas(State)).
blank_rate(Timestamp, IngressLength) ->
@@ -1443,7 +1440,7 @@ msg_indices_written_to_disk(QPid, GuidSet) ->
%% though the conversion function for that is called as necessary. The
%% reason is twofold. Firstly, this is safe because the conversion is
%% only ever necessary just after a transition to a
-%% target_ram_item_count of zero or after an incremental alpha->beta
+%% target_ram_count of zero or after an incremental alpha->beta
%% conversion. In the former case the conversion is performed straight
%% away (i.e. any betas present at the time are converted to deltas),
%% and in the latter case the need for a conversion is flagged up
@@ -1454,23 +1451,22 @@ msg_indices_written_to_disk(QPid, GuidSet) ->
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
- State = #vqstate {target_ram_item_count = infinity}) ->
+ State = #vqstate {target_ram_count = infinity}) ->
{false, State};
reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
State = #vqstate {
- ram_ack_index = RamAckIndex,
- ram_msg_count = RamMsgCount,
- target_ram_item_count = TargetRamItemCount,
- rates = #rates {
- avg_ingress = AvgIngress,
- avg_egress = AvgEgress },
- ack_rates = #rates {
- avg_ingress = AvgAckIngress,
- avg_egress = AvgAckEgress } }) ->
+ ram_ack_index = RamAckIndex,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount,
+ rates = #rates { avg_ingress = AvgIngress,
+ avg_egress = AvgEgress },
+ ack_rates = #rates { avg_ingress = AvgAckIngress,
+ avg_egress = AvgAckEgress }
+ }) ->
{Reduce, State1} =
case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
- TargetRamItemCount) of
+ TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
@@ -1489,7 +1485,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
{true, State2}
end,
- case State1 #vqstate.target_ram_item_count of
+ case State1 #vqstate.target_ram_count of
0 -> {Reduce, BetaDeltaFun(State1)};
_ -> case chunk_size(State1 #vqstate.ram_index_count,
permitted_ram_index_count(State1)) of
@@ -1685,11 +1681,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_item_count = TargetRamItemCount })
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
- TargetRamItemCount =:= infinity orelse
- TargetRamItemCount >= RamMsgCount ->
+ TargetRamCount =:= infinity orelse
+ TargetRamCount >= RamMsgCount ->
{Quota, State};
maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of