summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-27 15:05:01 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-27 15:05:01 +0000
commitf728f23ce9313e6457d8a59a5f9a9e85c4479278 (patch)
treea4cce087a2b608eba9ae667d59229fbdab2f9c27
parentfe13a2eb7718ce7f9ddb34f6b9fe59b8b01d23fe (diff)
downloadrabbitmq-server-f728f23ce9313e6457d8a59a5f9a9e85c4479278.tar.gz
Try to get the weighting factor right.
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_queue_consumers.erl10
-rw-r--r--src/rabbit_variable_queue.erl13
3 files changed, 19 insertions, 12 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 80e160d9..45f9751a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -71,6 +71,7 @@
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
+-export([moving_average/4]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -251,6 +252,7 @@
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
+-spec(moving_average/4 :: (float(), float(), float(), float()) -> float()).
-endif.
%%----------------------------------------------------------------------------
@@ -1088,6 +1090,12 @@ stop_timer(State, Idx) ->
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
+moving_average(_Time, _HalfLife, Next, undefined) ->
+ Next;
+moving_average(Time, HalfLife, Next, Current) ->
+ Weight = math:exp(Time * math:log(0.5) / HalfLife),
+ Next * (1 - Weight) + Current * Weight.
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index bea7e0d0..c9540da8 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -27,6 +27,9 @@
-define(UNSENT_MESSAGE_LIMIT, 200).
+%% Utilisation average calculations are all in μs.
+-define(USE_AVG_HALF_LIFE, 1000000.0).
+
-record(state, {consumers, use}).
-record(consumer, {tag, ack_required, args}).
@@ -430,11 +433,6 @@ update_use({inactive, Since, Active, Avg}, active) ->
use_avg(Active, Inactive, Avg) ->
Time = Inactive + Active,
- Ratio = Active / Time,
- Weight = erlang:min(1, Time / 1000000),
- case Avg of
- undefined -> Ratio;
- _ -> Ratio * Weight + Avg * (1 - Weight)
- end.
+ rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e2d8ecc8..6df04552 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -383,11 +383,14 @@
end_seq_id = Z }).
-define(MICROS_PER_SECOND, 1000000.0).
--define(SECONDS_FOR_RATE_AVG, 10.0).
+
+%% We're sampling every 5s for RAM duration; a half life that is of
+%% the same order of magnitude is probably about right.
+-define(RATE_AVG_HALF_LIFE, 5.0).
%% We will recalculate the #rates{} every time we get asked for our
%% RAM duration, or every N messages, whichever is sooner. We do this
-%% since the priority calcuations in rabbit_amqqueue_process need
+%% since the priority calculations in rabbit_amqqueue_process need
%% fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 1000).
@@ -749,11 +752,9 @@ update_rates(State = #vqstate{ in_counter = InCount,
ack_out_counter = 0,
rates = Rates }.
-update_rate(Now, TS, Count, Rate0) ->
+update_rate(Now, TS, Count, Rate) ->
Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND,
- Rate = Count / Time,
- Weight = erlang:min(1, Time / ?SECONDS_FOR_RATE_AVG),
- Rate * Weight + Rate0 * (1 - Weight).
+ rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate).
ram_duration(State0) ->
State = #vqstate {