diff options
author | Marek Majkowski <majek@lshift.net> | 2009-10-22 12:13:14 -0400 |
---|---|---|
committer | Marek Majkowski <majek@lshift.net> | 2009-10-22 12:13:14 -0400 |
commit | 155dab5b8d4cdf6cbea0b3a229cfff3f54ed6db5 (patch) | |
tree | 161ff1c4913dde84e1d171f28cf7c1d15109be4a | |
parent | 504c65eb7ccdd77d7ca30360deb943fa34dd7625 (diff) | |
download | rabbitmq-server-155dab5b8d4cdf6cbea0b3a229cfff3f54ed6db5.tar.gz |
QA: changed names to: queue_duration, changed MemoryOvercommit to be available/used (instead of used/available)
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_memory_monitor.erl | 53 |
2 files changed, 30 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d0123989..a5400254 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -823,7 +823,7 @@ handle_cast(send_memory_monitor_update, State) -> true -> infinity; false -> queue:len(State#q.message_buffer) / MsgSec end, - gen_server2:cast(rabbit_memory_monitor, {push_drain_ratio, self(), BufSec}), + rabbit_memory_monitor:push_queue_duration(self(), BufSec), noreply(State#q{drain_ratio = DrainRatio1}); handle_cast({set_bufsec_limit, BufSec}, State) -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 87ee96ad..8c1db615 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -45,7 +45,7 @@ %% v | v | | %% Monitor X--*-+--X---*-+--X------X----X-----X+-----------> %% -%% Or to put it in words. Queue periodically sends (casts) 'push_drain_ratio' +%% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration' %% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor %% _always_ replies with a 'set_bufsec_limit' cast. This way, %% we're pretty sure that the Queue is not hibernated. @@ -58,15 +58,15 @@ %% The main job of this module, is to make sure that all the queues have %% more or less the same number of seconds till become drained. %% This average, seconds-till-queue-is-drained, is then multiplied by -%% the ratio of Used/Total memory. So, if we can 'afford' more memory to be +%% the ratio of Total/Used memory. So, if we can 'afford' more memory to be %% used, we'll report greater number back to the queues. In the out of %% memory case, we are going to reduce the average drain-seconds. %% To acheive all this we need to accumulate the information from every %% queue, and count an average from that. %% -%% real_drain_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) -%% memory_overcommit = used_memory / allowed_memory -%% desired_drain_avg = real_drain_avg / memory_overcommit +%% real_queue_duration_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) +%% memory_overcommit = allowed_memory / used_memory +%% desired_queue_duration_avg = real_queue_duration_avg * memory_overcommit -module(rabbit_memory_monitor). @@ -81,12 +81,12 @@ -export([update/0]). --export([register/1]). +-export([register/1, push_queue_duration/2]). --record(state, {timer, %% 'internal_update' timer - drain_dict, %% dict, queue_pid:seconds_till_queue_is_empty - drain_avg, %% global, the desired queue depth (in seconds) - memory_limit %% how much memory we intend to use +-record(state, {timer, %% 'internal_update' timer + queue_duration_dict, %% dict, qpid:seconds_till_queue_is_empty + queue_duration_avg, %% global, the desired queue depth (in sec) + memory_limit %% how much memory we intend to use }). -define(SERVER, ?MODULE). @@ -97,6 +97,7 @@ -spec(start_link/0 :: () -> 'ignore' | {'error',_} | {'ok',pid()}). -spec(register/1 :: (pid()) -> ok). +-spec(push_queue_duration/2 :: (pid(), float() | infinity) -> ok). -spec(init/1 :: ([]) -> {ok, #state{}}). @@ -121,6 +122,9 @@ update() -> register(Pid) -> gen_server2:cast(?SERVER, {register, Pid}). +push_queue_duration(Pid, BufSec) -> + gen_server2:cast(rabbit_memory_monitor, {push_queue_duration, Pid, BufSec}). + %%---------------------------------------------------------------------------- get_user_memory_limit() -> @@ -148,8 +152,8 @@ init([]) -> {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, ?SERVER, update, []), {ok, #state{timer = TRef, - drain_dict = dict:new(), - drain_avg = infinity, + queue_duration_dict = dict:new(), + queue_duration_avg = infinity, memory_limit = MemoryLimit}}. handle_call(_Request, _From, State) -> @@ -163,17 +167,18 @@ handle_cast({register, Pid}, State) -> _MRef = erlang:monitor(process, Pid), {noreply, State}; -handle_cast({push_drain_ratio, Pid, DrainRatio}, State) -> - gen_server2:cast(Pid, {set_bufsec_limit, State#state.drain_avg}), - {noreply, State#state{drain_dict = - dict:store(Pid, DrainRatio, State#state.drain_dict)}}; +handle_cast({push_queue_duration, Pid, DrainRatio}, State) -> + gen_server2:cast(Pid, {set_bufsec_limit, State#state.queue_duration_avg}), + {noreply, State#state{queue_duration_dict = + dict:store(Pid, DrainRatio, State#state.queue_duration_dict)}}; handle_cast(_Request, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> - {noreply, State#state{drain_dict = dict:erase(Pid, State#state.drain_dict)}}; + {noreply, State#state{queue_duration_dict = + dict:erase(Pid, State#state.queue_duration_dict)}}; handle_info(_Info, State) -> {noreply, State}. @@ -203,18 +208,18 @@ count_average(List) -> end. internal_update(State) -> - %% used memory / available memory - MemoryOvercommit = erlang:memory(total) / State#state.memory_limit, - + %% available memory / used memory + UsedMemory = erlang:memory(total), + MemoryOvercommit = State#state.memory_limit / UsedMemory, RealDrainAvg = count_average([V || {_K, V} <- - dict:to_list(State#state.drain_dict)]), + dict:to_list(State#state.queue_duration_dict)]), %% In case of no active queues, feel free to grow. We can't make any %% decisionswe have no clue what is the average ram_usage/second. %% Not does the queue. DesiredDrainAvg = case RealDrainAvg of infinity -> infinity; 0.0 -> infinity; - _ -> RealDrainAvg / MemoryOvercommit + _ -> RealDrainAvg * MemoryOvercommit end, ?LOGDEBUG("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n", [ftoa(RealDrainAvg), ftoa(DesiredDrainAvg), @@ -228,7 +233,7 @@ internal_update(State) -> _ -> ok end end, - lists:map(ReduceMemory, dict:to_list(State#state.drain_dict)), - State#state{drain_avg = DesiredDrainAvg}. + lists:map(ReduceMemory, dict:to_list(State#state.queue_duration_dict)), + State#state{queue_duration_avg = DesiredDrainAvg}. |