summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-27 13:59:16 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-27 13:59:16 +0000
commitfe13a2eb7718ce7f9ddb34f6b9fe59b8b01d23fe (patch)
tree96c9ee1c225a528306d7d388556330f0d4d838b4
parent81926b8e4d26d53506a784379fb27575881e6ecb (diff)
downloadrabbitmq-server-fe13a2eb7718ce7f9ddb34f6b9fe59b8b01d23fe.tar.gz
Rates on exponential moving average.
-rw-r--r--src/rabbit_variable_queue.erl128
1 files changed, 61 insertions, 67 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 382d7d95..e2d8ecc8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -280,9 +280,7 @@
ack_in_counter
}).
--record(rates, { in_count, out_count, ack_in_count, ack_out_count,
- in_avg, out_avg, ack_in_avg, ack_out_avg,
- timestamp, old_timestamp}).
+-record(rates, { in, out, ack_in, ack_out, timestamp }).
-record(msg_status,
{ seq_id,
@@ -323,16 +321,11 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(rates() :: #rates { in_count :: non_neg_integer(),
- out_count :: non_neg_integer(),
- ack_in_count :: non_neg_integer(),
- ack_out_count :: non_neg_integer(),
- in_avg :: float(),
- out_avg :: float(),
- ack_in_avg :: float(),
- ack_out_avg :: float(),
- timestamp :: timestamp(),
- old_timestamp :: timestamp()}).
+-type(rates() :: #rates { in :: float(),
+ out :: float(),
+ ack_in :: float(),
+ ack_out :: float(),
+ timestamp :: timestamp()}).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer(),
@@ -389,6 +382,15 @@
count = 0,
end_seq_id = Z }).
+-define(MICROS_PER_SECOND, 1000000.0).
+-define(SECONDS_FOR_RATE_AVG, 10.0).
+
+%% We will recalculate the #rates{} every time we get asked for our
+%% RAM duration, or every N messages, whichever is sooner. We do this
+%% since the priority calcuations in rabbit_amqqueue_process need
+%% fairly fresh rates.
+-define(MSGS_PER_RATE_CALC, 1000).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -695,10 +697,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
set_ram_duration_target(
DurationTarget, State = #vqstate {
- rates = #rates { out_avg = AvgEgressRate,
- in_avg = AvgIngressRate,
- ack_out_avg = AvgAckEgressRate,
- ack_in_avg = AvgAckIngressRate },
+ rates = #rates { out = AvgEgressRate,
+ in = AvgIngressRate,
+ ack_out = AvgAckEgressRate,
+ ack_in = AvgAckIngressRate },
target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
@@ -719,49 +721,46 @@ maybe_update_rates(State = #vqstate { in_counter = InCount,
out_counter = OutCount,
ack_in_counter = AckInCount,
ack_out_counter = AckOutCount }) ->
- case InCount + OutCount + AckInCount + AckOutCount > 1000 of
+ case InCount + OutCount + AckInCount + AckOutCount > ?MSGS_PER_RATE_CALC of
true -> update_rates(State);
false -> State
end.
-update_rates(State = #vqstate{ in_counter = In,
- out_counter = Out,
- ack_in_counter = AckIn,
- ack_out_counter = AckOut,
- rates = #rates{ in_count = In0,
- out_count = Out0,
- ack_in_count = AckIn0,
- ack_out_count = AckOut0,
- timestamp = TS,
- old_timestamp = OldTS }}) ->
+update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
+ rates = #rates{ in = InRate,
+ out = OutRate,
+ ack_in = AckInRate,
+ ack_out = AckOutRate,
+ timestamp = TS }}) ->
Now = erlang:now(),
- Rates = #rates { out_count = Out,
- in_count = In,
- ack_out_count = AckOut,
- ack_in_count = AckIn,
- out_avg = update_rate(Now, OldTS, Out0, Out),
- in_avg = update_rate(Now, OldTS, In0, In),
- ack_out_avg = update_rate(Now, OldTS, AckOut0, AckOut),
- ack_in_avg = update_rate(Now, OldTS, AckIn0, AckIn),
- timestamp = Now,
- old_timestamp = TS },
+ Rates = #rates { out = update_rate(Now, TS, OutCount, OutRate),
+ in = update_rate(Now, TS, InCount, InRate),
+ ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
+ ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ timestamp = Now },
+
State#vqstate{ in_counter = 0,
out_counter = 0,
ack_in_counter = 0,
ack_out_counter = 0,
rates = Rates }.
-update_rate(Now, OldTS, Count, Count0) ->
- %% avg over the current period and the previous
- 1000000.0 * (Count + Count0) / timer:now_diff(Now, OldTS).
+update_rate(Now, TS, Count, Rate0) ->
+ Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND,
+ Rate = Count / Time,
+ Weight = erlang:min(1, Time / ?SECONDS_FOR_RATE_AVG),
+ Rate * Weight + Rate0 * (1 - Weight).
ram_duration(State0) ->
State = #vqstate {
- rates = #rates { out_avg = AvgEgressRate,
- in_avg = AvgIngressRate,
- ack_out_avg = AvgAckEgressRate,
- ack_in_avg = AvgAckIngressRate },
+ rates = #rates { out = AvgEgressRate,
+ in = AvgIngressRate,
+ ack_out = AvgAckEgressRate,
+ ack_in = AvgAckIngressRate },
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
ram_pending_ack = RPA,
@@ -805,8 +804,8 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-msg_rates(#vqstate { rates = #rates { out_avg = AvgEgressRate,
- in_avg = AvgIngressRate } }) ->
+msg_rates(#vqstate { rates = #rates { out = AvgEgressRate,
+ in = AvgIngressRate } }) ->
{AvgIngressRate, AvgEgressRate}.
status(#vqstate {
@@ -818,10 +817,10 @@ status(#vqstate {
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
- rates = #rates { in_avg = AvgIngressRate,
- out_avg = AvgEgressRate,
- ack_in_avg = AvgAckIngressRate,
- ack_out_avg = AvgAckEgressRate }}) ->
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate }}) ->
[ {q1 , ?QUEUE:len(Q1)},
{q2 , ?QUEUE:len(Q2)},
@@ -1051,7 +1050,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ram_ack_count_prev = 0,
out_counter = 0,
in_counter = 0,
- rates = blank_rates(Now, DeltaCount1),
+ rates = blank_rates(Now),
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
@@ -1060,17 +1059,12 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ack_in_counter = 0 },
a(maybe_deltas_to_betas(State)).
-blank_rates(Now, IngressLength) ->
- #rates { in_count = IngressLength,
- out_count = 0,
- ack_in_count = 0,
- ack_out_count = 0,
- in_avg = 0.0,
- out_avg = 0.0,
- ack_in_avg = 0.0,
- ack_out_avg = 0.0,
- timestamp = Now,
- old_timestamp = Now}.
+blank_rates(Now) ->
+ #rates { in = 0.0,
+ out = 0.0,
+ ack_in = 0.0,
+ ack_out = 0.0,
+ timestamp = Now}.
in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
@@ -1544,10 +1538,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
- rates = #rates { in_avg = AvgIngress,
- out_avg = AvgEgress,
- ack_in_avg = AvgAckIngress,
- ack_out_avg = AvgAckEgress }
+ rates = #rates { in = AvgIngress,
+ out = AvgEgress,
+ ack_in = AvgAckIngress,
+ ack_out = AvgAckEgress }
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =