summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-30 12:30:54 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-30 12:30:54 +0000
commit5e4fe3ae39437cced587745f36cefd1460e79808 (patch)
treedcc0ec025bda72bd83a5394209da71088bc0c100
parent5be4087156b12449ee14bc6c79cfd62f2e28074b (diff)
downloadrabbitmq-server-5e4fe3ae39437cced587745f36cefd1460e79808.tar.gz
Consistent ordering, and only take account of publishes when deciding whether to update rates (since it's only in conditions of high publish rates that we care).
-rw-r--r--src/rabbit_variable_queue.erl39
1 files changed, 18 insertions, 21 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c4033f28..9a9cb372 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -389,9 +389,9 @@
-define(RATE_AVG_HALF_LIFE, 5.0).
%% We will recalculate the #rates{} every time we get asked for our
-%% RAM duration, or every N messages, whichever is sooner. We do this
-%% since the priority calculations in rabbit_amqqueue_process need
-%% fairly fresh rates.
+%% RAM duration, or every N messages published, whichever is
+%% sooner. We do this since the priority calculations in
+%% rabbit_amqqueue_process need fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 100).
%%----------------------------------------------------------------------------
@@ -700,10 +700,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
set_ram_duration_target(
DurationTarget, State = #vqstate {
- rates = #rates { out = AvgEgressRate,
- in = AvgIngressRate,
- ack_out = AvgAckEgressRate,
- ack_in = AvgAckIngressRate },
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
@@ -720,11 +720,8 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
-maybe_update_rates(State = #vqstate { in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
- ack_out_counter = AckOutCount }) ->
- case InCount + OutCount + AckInCount + AckOutCount > ?MSGS_PER_RATE_CALC of
+maybe_update_rates(State = #vqstate { in_counter = InCount }) ->
+ case InCount > ?MSGS_PER_RATE_CALC of
true -> update_rates(State);
false -> State
end.
@@ -740,10 +737,10 @@ update_rates(State = #vqstate{ in_counter = InCount,
timestamp = TS }}) ->
Now = erlang:now(),
- Rates = #rates { out = update_rate(Now, TS, OutCount, OutRate),
- in = update_rate(Now, TS, InCount, InRate),
- ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
+ Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
+ out = update_rate(Now, TS, OutCount, OutRate),
ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
timestamp = Now },
State#vqstate{ in_counter = 0,
@@ -757,10 +754,10 @@ update_rate(Now, TS, Count, Rate) ->
rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate).
ram_duration(State) ->
- State1 = #vqstate { rates = #rates { out = AvgEgressRate,
- in = AvgIngressRate,
- ack_out = AvgAckEgressRate,
- ack_in = AvgAckIngressRate },
+ State1 = #vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
ram_pending_ack = RPA,
@@ -805,8 +802,8 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-msg_rates(#vqstate { rates = #rates { out = AvgEgressRate,
- in = AvgIngressRate } }) ->
+msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate } }) ->
{AvgIngressRate, AvgEgressRate}.
status(#vqstate {