diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-30 12:30:54 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-30 12:30:54 +0000 |
commit | 5e4fe3ae39437cced587745f36cefd1460e79808 (patch) | |
tree | dcc0ec025bda72bd83a5394209da71088bc0c100 | |
parent | 5be4087156b12449ee14bc6c79cfd62f2e28074b (diff) | |
download | rabbitmq-server-5e4fe3ae39437cced587745f36cefd1460e79808.tar.gz |
Consistent ordering, and only take account of publishes when deciding whether to update rates (since it's only in conditions of high publish rates that we care).
-rw-r--r-- | src/rabbit_variable_queue.erl | 39 |
1 files changed, 18 insertions, 21 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c4033f28..9a9cb372 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -389,9 +389,9 @@ -define(RATE_AVG_HALF_LIFE, 5.0). %% We will recalculate the #rates{} every time we get asked for our -%% RAM duration, or every N messages, whichever is sooner. We do this -%% since the priority calculations in rabbit_amqqueue_process need -%% fairly fresh rates. +%% RAM duration, or every N messages published, whichever is +%% sooner. We do this since the priority calculations in +%% rabbit_amqqueue_process need fairly fresh rates. -define(MSGS_PER_RATE_CALC, 100). %%---------------------------------------------------------------------------- @@ -700,10 +700,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> set_ram_duration_target( DurationTarget, State = #vqstate { - rates = #rates { out = AvgEgressRate, - in = AvgIngressRate, - ack_out = AvgAckEgressRate, - ack_in = AvgAckIngressRate }, + rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }, target_ram_count = TargetRamCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, @@ -720,11 +720,8 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). -maybe_update_rates(State = #vqstate { in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount }) -> - case InCount + OutCount + AckInCount + AckOutCount > ?MSGS_PER_RATE_CALC of +maybe_update_rates(State = #vqstate { in_counter = InCount }) -> + case InCount > ?MSGS_PER_RATE_CALC of true -> update_rates(State); false -> State end. @@ -740,10 +737,10 @@ update_rates(State = #vqstate{ in_counter = InCount, timestamp = TS }}) -> Now = erlang:now(), - Rates = #rates { out = update_rate(Now, TS, OutCount, OutRate), - in = update_rate(Now, TS, InCount, InRate), - ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), + Rates = #rates { in = update_rate(Now, TS, InCount, InRate), + out = update_rate(Now, TS, OutCount, OutRate), ack_in = update_rate(Now, TS, AckInCount, AckInRate), + ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), timestamp = Now }, State#vqstate{ in_counter = 0, @@ -757,10 +754,10 @@ update_rate(Now, TS, Count, Rate) -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate). ram_duration(State) -> - State1 = #vqstate { rates = #rates { out = AvgEgressRate, - in = AvgIngressRate, - ack_out = AvgAckEgressRate, - ack_in = AvgAckIngressRate }, + State1 = #vqstate { rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }, ram_msg_count = RamMsgCount, ram_msg_count_prev = RamMsgCountPrev, ram_pending_ack = RPA, @@ -805,8 +802,8 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -msg_rates(#vqstate { rates = #rates { out = AvgEgressRate, - in = AvgIngressRate } }) -> +msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, + out = AvgEgressRate } }) -> {AvgIngressRate, AvgEgressRate}. status(#vqstate { |