diff options
Diffstat (limited to 'src/rabbit_memory_monitor.erl')
-rw-r--r-- | src/rabbit_memory_monitor.erl | 269 |
1 files changed, 0 insertions, 269 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl deleted file mode 100644 index 451ee1f4..00000000 --- a/src/rabbit_memory_monitor.erl +++ /dev/null @@ -1,269 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. -%% - - -%% This module handles the node-wide memory statistics. -%% It receives statistics from all queues, counts the desired -%% queue length (in seconds), and sends this information back to -%% queues. - --module(rabbit_memory_monitor). - --behaviour(gen_server2). - --export([start_link/0, register/2, deregister/1, - report_ram_duration/2, stop/0, conserve_resources/3]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(process, {pid, reported, sent, callback, monitor}). - --record(state, {timer, %% 'internal_update' timer - queue_durations, %% ets #process - queue_duration_sum, %% sum of all queue_durations - queue_duration_count, %% number of elements in sum - desired_duration, %% the desired queue duration - disk_alarm %% disable paging, disk alarm has fired - }). - --define(SERVER, ?MODULE). --define(DEFAULT_UPDATE_INTERVAL, 2500). --define(TABLE_NAME, ?MODULE). - -%% If all queues are pushed to disk (duration 0), then the sum of -%% their reported lengths will be 0. If memory then becomes available, -%% unless we manually intervene, the sum will remain 0, and the queues -%% will never get a non-zero duration. Thus when the mem use is < -%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT. --define(SUM_INC_THRESHOLD, 0.95). --define(SUM_INC_AMOUNT, 1.0). - --define(EPSILON, 0.000001). %% less than this and we clamp to 0 - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). --spec(deregister/1 :: (pid()) -> 'ok'). --spec(report_ram_duration/2 :: - (pid(), float() | 'infinity') -> number() | 'infinity'). --spec(stop/0 :: () -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- -%% Public API -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). - -register(Pid, MFA = {_M, _F, _A}) -> - gen_server2:call(?SERVER, {register, Pid, MFA}, infinity). - -deregister(Pid) -> - gen_server2:cast(?SERVER, {deregister, Pid}). - -report_ram_duration(Pid, QueueDuration) -> - gen_server2:call(?SERVER, - {report_ram_duration, Pid, QueueDuration}, infinity). - -stop() -> - gen_server2:cast(?SERVER, stop). - -conserve_resources(Pid, disk, Conserve) -> - gen_server2:cast(Pid, {disk_alarm, Conserve}); -conserve_resources(_Pid, _Source, _Conserve) -> - ok. - -%%---------------------------------------------------------------------------- -%% Gen_server callbacks -%%---------------------------------------------------------------------------- - -init([]) -> - {ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update), - - Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), - Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - {ok, internal_update( - #state { timer = TRef, - queue_durations = Ets, - queue_duration_sum = 0.0, - queue_duration_count = 0, - desired_duration = infinity, - disk_alarm = lists:member(disk, Alarms)})}. - -handle_call({report_ram_duration, Pid, QueueDuration}, From, - State = #state { queue_duration_sum = Sum, - queue_duration_count = Count, - queue_durations = Durations, - desired_duration = SendDuration }) -> - - [Proc = #process { reported = PrevQueueDuration }] = - ets:lookup(Durations, Pid), - - gen_server2:reply(From, SendDuration), - - {Sum1, Count1} = - case {PrevQueueDuration, QueueDuration} of - {infinity, infinity} -> {Sum, Count}; - {infinity, _} -> {Sum + QueueDuration, Count + 1}; - {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; - {_, _} -> {Sum - PrevQueueDuration + QueueDuration, - Count} - end, - true = ets:insert(Durations, Proc #process { reported = QueueDuration, - sent = SendDuration }), - {noreply, State #state { queue_duration_sum = zero_clamp(Sum1), - queue_duration_count = Count1 }}; - -handle_call({register, Pid, MFA}, _From, - State = #state { queue_durations = Durations }) -> - MRef = erlang:monitor(process, Pid), - true = ets:insert(Durations, #process { pid = Pid, reported = infinity, - sent = infinity, callback = MFA, - monitor = MRef }), - {reply, ok, State}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) -> - {noreply, State}; - -handle_cast({disk_alarm, Alarm}, State) -> - {noreply, internal_update(State#state{disk_alarm = Alarm})}; - -handle_cast({deregister, Pid}, State) -> - {noreply, internal_deregister(Pid, true, State)}; - -handle_cast(stop, State) -> - {stop, normal, State}; - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(update, State) -> - {noreply, internal_update(State)}; - -handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> - {noreply, internal_deregister(Pid, false, State)}; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, #state { timer = TRef }) -> - timer:cancel(TRef), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -%%---------------------------------------------------------------------------- -%% Internal functions -%%---------------------------------------------------------------------------- - -zero_clamp(Sum) when Sum < ?EPSILON -> 0.0; -zero_clamp(Sum) -> Sum. - -internal_deregister(Pid, Demonitor, - State = #state { queue_duration_sum = Sum, - queue_duration_count = Count, - queue_durations = Durations }) -> - case ets:lookup(Durations, Pid) of - [] -> State; - [#process { reported = PrevQueueDuration, monitor = MRef }] -> - true = case Demonitor of - true -> erlang:demonitor(MRef); - false -> true - end, - {Sum1, Count1} = - case PrevQueueDuration of - infinity -> {Sum, Count}; - _ -> {zero_clamp(Sum - PrevQueueDuration), - Count - 1} - end, - true = ets:delete(Durations, Pid), - State #state { queue_duration_sum = Sum1, - queue_duration_count = Count1 } - end. - -internal_update(State = #state{queue_durations = Durations, - desired_duration = DesiredDurationAvg, - disk_alarm = DiskAlarm}) -> - DesiredDurationAvg1 = desired_duration_average(State), - ShouldInform = should_inform_predicate(DiskAlarm), - case ShouldInform(DesiredDurationAvg, DesiredDurationAvg1) of - true -> inform_queues(ShouldInform, DesiredDurationAvg1, Durations); - false -> ok - end, - State#state{desired_duration = DesiredDurationAvg1}. - -desired_duration_average(#state{disk_alarm = true}) -> - infinity; -desired_duration_average(#state{disk_alarm = false, - queue_duration_sum = Sum, - queue_duration_count = Count}) -> - {ok, LimitThreshold} = - application:get_env(rabbit, vm_memory_high_watermark_paging_ratio), - MemoryLimit = vm_memory_monitor:get_memory_limit(), - MemoryRatio = case MemoryLimit > 0.0 of - true -> erlang:memory(total) / MemoryLimit; - false -> infinity - end, - if MemoryRatio =:= infinity -> - 0.0; - MemoryRatio < LimitThreshold orelse Count == 0 -> - infinity; - MemoryRatio < ?SUM_INC_THRESHOLD -> - ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; - true -> - (Sum / Count) / MemoryRatio - end. - -inform_queues(ShouldInform, DesiredDurationAvg, Durations) -> - true = - ets:foldl( - fun (Proc = #process{reported = QueueDuration, - sent = PrevSendDuration, - callback = {M, F, A}}, true) -> - case ShouldInform(PrevSendDuration, DesiredDurationAvg) - andalso ShouldInform(QueueDuration, DesiredDurationAvg) of - true -> ok = erlang:apply( - M, F, A ++ [DesiredDurationAvg]), - ets:insert( - Durations, - Proc#process{sent = DesiredDurationAvg}); - false -> true - end - end, true, Durations). - -%% In normal use, we only inform queues immediately if the desired -%% duration has decreased, we want to ensure timely paging. -should_inform_predicate(false) -> fun greater_than/2; -%% When the disk alarm has gone off though, we want to inform queues -%% immediately if the desired duration has *increased* - we want to -%% ensure timely stopping paging. -should_inform_predicate(true) -> fun (D1, D2) -> greater_than(D2, D1) end. - -greater_than(infinity, infinity) -> false; -greater_than(infinity, _D2) -> true; -greater_than(_D1, infinity) -> false; -greater_than(D1, D2) -> D1 > D2. |