summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarek Majkowski <majek@lshift.net>2009-10-07 18:16:48 +0100
committerMarek Majkowski <majek@lshift.net>2009-10-07 18:16:48 +0100
commit9e23636552326c807e4e1fa5618cace2c6e75f9e (patch)
tree20919e1eac2a172455c43b436b19a55b5fd6fe51
parent3270c8e1c60ca72bd2c7ece7eef6fc83c45981f1 (diff)
downloadrabbitmq-server-9e23636552326c807e4e1fa5618cace2c6e75f9e.tar.gz
Rewritten the couting, hopefully, it's simplified now
-rw-r--r--src/rabbit_memory_monitor.erl81
1 files changed, 47 insertions, 34 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index ebbae94a..e878edda 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -66,7 +66,7 @@
%%
%% real_drain_avg = avg([drain_from_queue_1, queue_2, queue_3, ...])
%% memory_overcommit = used_memory / allowed_memory
-%% desired_drain_avg = memory_overcommit * real_drain_avg
+%% desired_drain_avg = real_drain_avg / memory_overcommit
-module(rabbit_memory_monitor).
@@ -115,16 +115,30 @@ register(Pid) ->
%%----------------------------------------------------------------------------
-init([]) ->
+get_user_memory_limit() ->
%% TODO: References to os_mon and rabbit_memsup_linux
%% should go away as bug 21457 removes it.
%% BTW: memsup:get_system_memory_data() doesn't work.
{state, TotalMemory, _Allocated} = rabbit_memsup_linux:update({state, 0,0}),
+ MemoryHighWatermark = os_mon:get_env(memsup, system_memory_high_watermark),
+ Limit = erlang:trunc(TotalMemory * MemoryHighWatermark),
+ %% no more than two gigs on 32 bits.
+ case (Limit > 2*1024*1024*1024) and (erlang:system_info(wordsize) == 4) of
+ true -> 2*1024*1024*1024;
+ false -> Limit
+ end.
+
+
+init([]) ->
+ %% We should never use more memory than user requested. As the memory
+ %% manager doesn't really know how much memory queues are using, we shall
+ %% try to remain safe distance from real limit.
+ MemoryLimit = get_user_memory_limit() * 0.6,
+ rabbit_log:warning("Memory monitor limit: ~pMB~n",
+ [erlang:trunc(MemoryLimit/1024/1024)]),
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS,
?SERVER, update, []),
- MemoryHighWatermark = os_mon:get_env(memsup, system_memory_high_watermark),
- MemoryLimit = erlang:trunc(TotalMemory * MemoryHighWatermark),
{ok, #state{timer = TRef,
drain_dict = dict:new(),
drain_avg = infinity,
@@ -171,51 +185,50 @@ ftoa(Float) ->
end,
lists:flatten(Str).
-print_debug_info(UsedSeconds, AvailableSeconds, UsedMemory, TotalMemory,
- PerQueueSeconds, QueueSec) ->
- io:format("Update ~s/~s ~s/~s PerQueueSeconds:~s ~s~n",
- [ftoa(UsedSeconds), ftoa(AvailableSeconds),
- ftoa(UsedMemory/1024.0/1024.0), ftoa(TotalMemory/1024.0/1024.0),
- ftoa(PerQueueSeconds),
- [" "] ++ lists:flatten([ftoa(Q)++" " || Q <- QueueSec])
- ]).
+print_debug_info(RealDrainAvg, DesiredDrainAvg, MemoryOvercommit) ->
+ io:format("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n",
+ [ftoa(RealDrainAvg), ftoa(DesiredDrainAvg),
+ ftoa(MemoryOvercommit)]).
-else.
-print_debug_info(_UsedSeconds, _AvailableSeconds, _UsedMemory, _TotalMemory,
- _PerQueueSeconds, _QueueSec) ->
+print_debug_info(_RealDrainAvg, _DesiredDrainAvg, _MemoryOvercommit) ->
ok.
-endif.
+%% Count average from numbers, excluding atoms in the list.
+count_average(List) ->
+ List1 = [V || V <- List, is_number(V) or is_float(V)],
+ case length(List1) of
+ 0 -> infinity;
+ Len -> lists:sum(List1) / Len
+ end.
+
internal_update(State) ->
- UsedMemory = erlang:memory(total),
- TotalMemory = State#state.memory_limit,
- QueueSec = [V || {_K, V} <- dict:to_list(State#state.drain_dict) ],
- UsedSeconds = lists:sum( lists:filter(fun (A) ->
- is_number(A) or is_float(A)
- end,
- QueueSec) ),
- AvailableSeconds = case UsedSeconds of
+ %% used memory / available memory
+ MemoryOvercommit = erlang:memory(total) / State#state.memory_limit,
+
+ RealDrainAvg = count_average([V || {_K, V} <-
+ dict:to_list(State#state.drain_dict)]),
+ %% In case of no active queues, feel free to grow. We can't make any
+ %% decisionswe have no clue what is the average ram_usage/second.
+ %% Not does the queue.
+ DesiredDrainAvg = case RealDrainAvg of
+ infinity -> infinity;
0 -> infinity;
0.0 -> infinity;
- _ -> TotalMemory / (UsedMemory / UsedSeconds)
- end,
- QueuesNumber = dict:size(State#state.drain_dict),
- PerQueueSeconds = case (QueuesNumber > 0) and (AvailableSeconds /= infinity) of
- true -> AvailableSeconds / QueuesNumber;
- false -> infinity
+ _ -> RealDrainAvg / MemoryOvercommit
end,
- print_debug_info(UsedSeconds, AvailableSeconds, UsedMemory, TotalMemory,
- PerQueueSeconds, QueueSec),
+ print_debug_info(RealDrainAvg, DesiredDrainAvg, MemoryOvercommit),
%% Inform the queue to reduce it's memory usage when needed.
%% This can sometimes wake the queue from hibernation. Well, we don't care.
- ReduceMemory = fun ({Pid, QueueS}) ->
- case QueueS > PerQueueSeconds of
+ ReduceMemory = fun ({Pid, QueueDrain}) ->
+ case QueueDrain > DesiredDrainAvg of
true ->
- gen_server2:cast(Pid, {set_bufsec_limit, PerQueueSeconds});
+ gen_server2:cast(Pid, {set_bufsec_limit, DesiredDrainAvg});
_ -> ok
end
end,
lists:map(ReduceMemory, dict:to_list(State#state.drain_dict)),
- State#state{drain_avg = PerQueueSeconds}.
+ State#state{drain_avg = DesiredDrainAvg}.