diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-27 13:59:16 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-27 13:59:16 +0000 |
commit | fe13a2eb7718ce7f9ddb34f6b9fe59b8b01d23fe (patch) | |
tree | 96c9ee1c225a528306d7d388556330f0d4d838b4 | |
parent | 81926b8e4d26d53506a784379fb27575881e6ecb (diff) | |
download | rabbitmq-server-fe13a2eb7718ce7f9ddb34f6b9fe59b8b01d23fe.tar.gz |
Rates on exponential moving average.
-rw-r--r-- | src/rabbit_variable_queue.erl | 128 |
1 files changed, 61 insertions, 67 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 382d7d95..e2d8ecc8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -280,9 +280,7 @@ ack_in_counter }). --record(rates, { in_count, out_count, ack_in_count, ack_out_count, - in_avg, out_avg, ack_in_avg, ack_out_avg, - timestamp, old_timestamp}). +-record(rates, { in, out, ack_in, ack_out, timestamp }). -record(msg_status, { seq_id, @@ -323,16 +321,11 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(rates() :: #rates { in_count :: non_neg_integer(), - out_count :: non_neg_integer(), - ack_in_count :: non_neg_integer(), - ack_out_count :: non_neg_integer(), - in_avg :: float(), - out_avg :: float(), - ack_in_avg :: float(), - ack_out_avg :: float(), - timestamp :: timestamp(), - old_timestamp :: timestamp()}). +-type(rates() :: #rates { in :: float(), + out :: float(), + ack_in :: float(), + ack_out :: float(), + timestamp :: timestamp()}). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer(), @@ -389,6 +382,15 @@ count = 0, end_seq_id = Z }). +-define(MICROS_PER_SECOND, 1000000.0). +-define(SECONDS_FOR_RATE_AVG, 10.0). + +%% We will recalculate the #rates{} every time we get asked for our +%% RAM duration, or every N messages, whichever is sooner. We do this +%% since the priority calcuations in rabbit_amqqueue_process need +%% fairly fresh rates. +-define(MSGS_PER_RATE_CALC, 1000). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -695,10 +697,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> set_ram_duration_target( DurationTarget, State = #vqstate { - rates = #rates { out_avg = AvgEgressRate, - in_avg = AvgIngressRate, - ack_out_avg = AvgAckEgressRate, - ack_in_avg = AvgAckIngressRate }, + rates = #rates { out = AvgEgressRate, + in = AvgIngressRate, + ack_out = AvgAckEgressRate, + ack_in = AvgAckIngressRate }, target_ram_count = TargetRamCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, @@ -719,49 +721,46 @@ maybe_update_rates(State = #vqstate { in_counter = InCount, out_counter = OutCount, ack_in_counter = AckInCount, ack_out_counter = AckOutCount }) -> - case InCount + OutCount + AckInCount + AckOutCount > 1000 of + case InCount + OutCount + AckInCount + AckOutCount > ?MSGS_PER_RATE_CALC of true -> update_rates(State); false -> State end. -update_rates(State = #vqstate{ in_counter = In, - out_counter = Out, - ack_in_counter = AckIn, - ack_out_counter = AckOut, - rates = #rates{ in_count = In0, - out_count = Out0, - ack_in_count = AckIn0, - ack_out_count = AckOut0, - timestamp = TS, - old_timestamp = OldTS }}) -> +update_rates(State = #vqstate{ in_counter = InCount, + out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, + rates = #rates{ in = InRate, + out = OutRate, + ack_in = AckInRate, + ack_out = AckOutRate, + timestamp = TS }}) -> Now = erlang:now(), - Rates = #rates { out_count = Out, - in_count = In, - ack_out_count = AckOut, - ack_in_count = AckIn, - out_avg = update_rate(Now, OldTS, Out0, Out), - in_avg = update_rate(Now, OldTS, In0, In), - ack_out_avg = update_rate(Now, OldTS, AckOut0, AckOut), - ack_in_avg = update_rate(Now, OldTS, AckIn0, AckIn), - timestamp = Now, - old_timestamp = TS }, + Rates = #rates { out = update_rate(Now, TS, OutCount, OutRate), + in = update_rate(Now, TS, InCount, InRate), + ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), + ack_in = update_rate(Now, TS, AckInCount, AckInRate), + timestamp = Now }, + State#vqstate{ in_counter = 0, out_counter = 0, ack_in_counter = 0, ack_out_counter = 0, rates = Rates }. -update_rate(Now, OldTS, Count, Count0) -> - %% avg over the current period and the previous - 1000000.0 * (Count + Count0) / timer:now_diff(Now, OldTS). +update_rate(Now, TS, Count, Rate0) -> + Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND, + Rate = Count / Time, + Weight = erlang:min(1, Time / ?SECONDS_FOR_RATE_AVG), + Rate * Weight + Rate0 * (1 - Weight). ram_duration(State0) -> State = #vqstate { - rates = #rates { out_avg = AvgEgressRate, - in_avg = AvgIngressRate, - ack_out_avg = AvgAckEgressRate, - ack_in_avg = AvgAckIngressRate }, + rates = #rates { out = AvgEgressRate, + in = AvgIngressRate, + ack_out = AvgAckEgressRate, + ack_in = AvgAckIngressRate }, ram_msg_count = RamMsgCount, ram_msg_count_prev = RamMsgCountPrev, ram_pending_ack = RPA, @@ -805,8 +804,8 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -msg_rates(#vqstate { rates = #rates { out_avg = AvgEgressRate, - in_avg = AvgIngressRate } }) -> +msg_rates(#vqstate { rates = #rates { out = AvgEgressRate, + in = AvgIngressRate } }) -> {AvgIngressRate, AvgEgressRate}. status(#vqstate { @@ -818,10 +817,10 @@ status(#vqstate { ram_msg_count = RamMsgCount, next_seq_id = NextSeqId, persistent_count = PersistentCount, - rates = #rates { in_avg = AvgIngressRate, - out_avg = AvgEgressRate, - ack_in_avg = AvgAckIngressRate, - ack_out_avg = AvgAckEgressRate }}) -> + rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }}) -> [ {q1 , ?QUEUE:len(Q1)}, {q2 , ?QUEUE:len(Q2)}, @@ -1051,7 +1050,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_ack_count_prev = 0, out_counter = 0, in_counter = 0, - rates = blank_rates(Now, DeltaCount1), + rates = blank_rates(Now), msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), @@ -1060,17 +1059,12 @@ init(IsDurable, IndexState, DeltaCount, Terms, ack_in_counter = 0 }, a(maybe_deltas_to_betas(State)). -blank_rates(Now, IngressLength) -> - #rates { in_count = IngressLength, - out_count = 0, - ack_in_count = 0, - ack_out_count = 0, - in_avg = 0.0, - out_avg = 0.0, - ack_in_avg = 0.0, - ack_out_avg = 0.0, - timestamp = Now, - old_timestamp = Now}. +blank_rates(Now) -> + #rates { in = 0.0, + out = 0.0, + ack_in = 0.0, + ack_out = 0.0, + timestamp = Now}. in_r(MsgStatus = #msg_status { msg = undefined }, State = #vqstate { q3 = Q3, q4 = Q4 }) -> @@ -1544,10 +1538,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, - rates = #rates { in_avg = AvgIngress, - out_avg = AvgEgress, - ack_in_avg = AvgAckIngress, - ack_out_avg = AvgAckEgress } + rates = #rates { in = AvgIngress, + out = AvgEgress, + ack_in = AvgAckIngress, + ack_out = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = |