diff options
author | Marek Majkowski <majek@lshift.net> | 2009-10-07 18:16:48 +0100 |
---|---|---|
committer | Marek Majkowski <majek@lshift.net> | 2009-10-07 18:16:48 +0100 |
commit | 9e23636552326c807e4e1fa5618cace2c6e75f9e (patch) | |
tree | 20919e1eac2a172455c43b436b19a55b5fd6fe51 | |
parent | 3270c8e1c60ca72bd2c7ece7eef6fc83c45981f1 (diff) | |
download | rabbitmq-server-9e23636552326c807e4e1fa5618cace2c6e75f9e.tar.gz |
Rewritten the couting, hopefully, it's simplified now
-rw-r--r-- | src/rabbit_memory_monitor.erl | 81 |
1 files changed, 47 insertions, 34 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index ebbae94a..e878edda 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -66,7 +66,7 @@ %% %% real_drain_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) %% memory_overcommit = used_memory / allowed_memory -%% desired_drain_avg = memory_overcommit * real_drain_avg +%% desired_drain_avg = real_drain_avg / memory_overcommit -module(rabbit_memory_monitor). @@ -115,16 +115,30 @@ register(Pid) -> %%---------------------------------------------------------------------------- -init([]) -> +get_user_memory_limit() -> %% TODO: References to os_mon and rabbit_memsup_linux %% should go away as bug 21457 removes it. %% BTW: memsup:get_system_memory_data() doesn't work. {state, TotalMemory, _Allocated} = rabbit_memsup_linux:update({state, 0,0}), + MemoryHighWatermark = os_mon:get_env(memsup, system_memory_high_watermark), + Limit = erlang:trunc(TotalMemory * MemoryHighWatermark), + %% no more than two gigs on 32 bits. + case (Limit > 2*1024*1024*1024) and (erlang:system_info(wordsize) == 4) of + true -> 2*1024*1024*1024; + false -> Limit + end. + + +init([]) -> + %% We should never use more memory than user requested. As the memory + %% manager doesn't really know how much memory queues are using, we shall + %% try to remain safe distance from real limit. + MemoryLimit = get_user_memory_limit() * 0.6, + rabbit_log:warning("Memory monitor limit: ~pMB~n", + [erlang:trunc(MemoryLimit/1024/1024)]), {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, ?SERVER, update, []), - MemoryHighWatermark = os_mon:get_env(memsup, system_memory_high_watermark), - MemoryLimit = erlang:trunc(TotalMemory * MemoryHighWatermark), {ok, #state{timer = TRef, drain_dict = dict:new(), drain_avg = infinity, @@ -171,51 +185,50 @@ ftoa(Float) -> end, lists:flatten(Str). -print_debug_info(UsedSeconds, AvailableSeconds, UsedMemory, TotalMemory, - PerQueueSeconds, QueueSec) -> - io:format("Update ~s/~s ~s/~s PerQueueSeconds:~s ~s~n", - [ftoa(UsedSeconds), ftoa(AvailableSeconds), - ftoa(UsedMemory/1024.0/1024.0), ftoa(TotalMemory/1024.0/1024.0), - ftoa(PerQueueSeconds), - [" "] ++ lists:flatten([ftoa(Q)++" " || Q <- QueueSec]) - ]). +print_debug_info(RealDrainAvg, DesiredDrainAvg, MemoryOvercommit) -> + io:format("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n", + [ftoa(RealDrainAvg), ftoa(DesiredDrainAvg), + ftoa(MemoryOvercommit)]). -else. -print_debug_info(_UsedSeconds, _AvailableSeconds, _UsedMemory, _TotalMemory, - _PerQueueSeconds, _QueueSec) -> +print_debug_info(_RealDrainAvg, _DesiredDrainAvg, _MemoryOvercommit) -> ok. -endif. +%% Count average from numbers, excluding atoms in the list. +count_average(List) -> + List1 = [V || V <- List, is_number(V) or is_float(V)], + case length(List1) of + 0 -> infinity; + Len -> lists:sum(List1) / Len + end. + internal_update(State) -> - UsedMemory = erlang:memory(total), - TotalMemory = State#state.memory_limit, - QueueSec = [V || {_K, V} <- dict:to_list(State#state.drain_dict) ], - UsedSeconds = lists:sum( lists:filter(fun (A) -> - is_number(A) or is_float(A) - end, - QueueSec) ), - AvailableSeconds = case UsedSeconds of + %% used memory / available memory + MemoryOvercommit = erlang:memory(total) / State#state.memory_limit, + + RealDrainAvg = count_average([V || {_K, V} <- + dict:to_list(State#state.drain_dict)]), + %% In case of no active queues, feel free to grow. We can't make any + %% decisionswe have no clue what is the average ram_usage/second. + %% Not does the queue. + DesiredDrainAvg = case RealDrainAvg of + infinity -> infinity; 0 -> infinity; 0.0 -> infinity; - _ -> TotalMemory / (UsedMemory / UsedSeconds) - end, - QueuesNumber = dict:size(State#state.drain_dict), - PerQueueSeconds = case (QueuesNumber > 0) and (AvailableSeconds /= infinity) of - true -> AvailableSeconds / QueuesNumber; - false -> infinity + _ -> RealDrainAvg / MemoryOvercommit end, - print_debug_info(UsedSeconds, AvailableSeconds, UsedMemory, TotalMemory, - PerQueueSeconds, QueueSec), + print_debug_info(RealDrainAvg, DesiredDrainAvg, MemoryOvercommit), %% Inform the queue to reduce it's memory usage when needed. %% This can sometimes wake the queue from hibernation. Well, we don't care. - ReduceMemory = fun ({Pid, QueueS}) -> - case QueueS > PerQueueSeconds of + ReduceMemory = fun ({Pid, QueueDrain}) -> + case QueueDrain > DesiredDrainAvg of true -> - gen_server2:cast(Pid, {set_bufsec_limit, PerQueueSeconds}); + gen_server2:cast(Pid, {set_bufsec_limit, DesiredDrainAvg}); _ -> ok end end, lists:map(ReduceMemory, dict:to_list(State#state.drain_dict)), - State#state{drain_avg = PerQueueSeconds}. + State#state{drain_avg = DesiredDrainAvg}. |