summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarek Majkowski <majek@lshift.net>2009-10-22 12:13:14 -0400
committerMarek Majkowski <majek@lshift.net>2009-10-22 12:13:14 -0400
commit155dab5b8d4cdf6cbea0b3a229cfff3f54ed6db5 (patch)
tree161ff1c4913dde84e1d171f28cf7c1d15109be4a
parent504c65eb7ccdd77d7ca30360deb943fa34dd7625 (diff)
downloadrabbitmq-server-155dab5b8d4cdf6cbea0b3a229cfff3f54ed6db5.tar.gz
QA: changed names to: queue_duration, changed MemoryOvercommit to be available/used (instead of used/available)
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_memory_monitor.erl53
2 files changed, 30 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d0123989..a5400254 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -823,7 +823,7 @@ handle_cast(send_memory_monitor_update, State) ->
true -> infinity;
false -> queue:len(State#q.message_buffer) / MsgSec
end,
- gen_server2:cast(rabbit_memory_monitor, {push_drain_ratio, self(), BufSec}),
+ rabbit_memory_monitor:push_queue_duration(self(), BufSec),
noreply(State#q{drain_ratio = DrainRatio1});
handle_cast({set_bufsec_limit, BufSec}, State) ->
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 87ee96ad..8c1db615 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -45,7 +45,7 @@
%% v | v | |
%% Monitor X--*-+--X---*-+--X------X----X-----X+----------->
%%
-%% Or to put it in words. Queue periodically sends (casts) 'push_drain_ratio'
+%% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration'
%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor
%% _always_ replies with a 'set_bufsec_limit' cast. This way,
%% we're pretty sure that the Queue is not hibernated.
@@ -58,15 +58,15 @@
%% The main job of this module, is to make sure that all the queues have
%% more or less the same number of seconds till become drained.
%% This average, seconds-till-queue-is-drained, is then multiplied by
-%% the ratio of Used/Total memory. So, if we can 'afford' more memory to be
+%% the ratio of Total/Used memory. So, if we can 'afford' more memory to be
%% used, we'll report greater number back to the queues. In the out of
%% memory case, we are going to reduce the average drain-seconds.
%% To acheive all this we need to accumulate the information from every
%% queue, and count an average from that.
%%
-%% real_drain_avg = avg([drain_from_queue_1, queue_2, queue_3, ...])
-%% memory_overcommit = used_memory / allowed_memory
-%% desired_drain_avg = real_drain_avg / memory_overcommit
+%% real_queue_duration_avg = avg([drain_from_queue_1, queue_2, queue_3, ...])
+%% memory_overcommit = allowed_memory / used_memory
+%% desired_queue_duration_avg = real_queue_duration_avg * memory_overcommit
-module(rabbit_memory_monitor).
@@ -81,12 +81,12 @@
-export([update/0]).
--export([register/1]).
+-export([register/1, push_queue_duration/2]).
--record(state, {timer, %% 'internal_update' timer
- drain_dict, %% dict, queue_pid:seconds_till_queue_is_empty
- drain_avg, %% global, the desired queue depth (in seconds)
- memory_limit %% how much memory we intend to use
+-record(state, {timer, %% 'internal_update' timer
+ queue_duration_dict, %% dict, qpid:seconds_till_queue_is_empty
+ queue_duration_avg, %% global, the desired queue depth (in sec)
+ memory_limit %% how much memory we intend to use
}).
-define(SERVER, ?MODULE).
@@ -97,6 +97,7 @@
-spec(start_link/0 :: () -> 'ignore' | {'error',_} | {'ok',pid()}).
-spec(register/1 :: (pid()) -> ok).
+-spec(push_queue_duration/2 :: (pid(), float() | infinity) -> ok).
-spec(init/1 :: ([]) -> {ok, #state{}}).
@@ -121,6 +122,9 @@ update() ->
register(Pid) ->
gen_server2:cast(?SERVER, {register, Pid}).
+push_queue_duration(Pid, BufSec) ->
+ gen_server2:cast(rabbit_memory_monitor, {push_queue_duration, Pid, BufSec}).
+
%%----------------------------------------------------------------------------
get_user_memory_limit() ->
@@ -148,8 +152,8 @@ init([]) ->
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS,
?SERVER, update, []),
{ok, #state{timer = TRef,
- drain_dict = dict:new(),
- drain_avg = infinity,
+ queue_duration_dict = dict:new(),
+ queue_duration_avg = infinity,
memory_limit = MemoryLimit}}.
handle_call(_Request, _From, State) ->
@@ -163,17 +167,18 @@ handle_cast({register, Pid}, State) ->
_MRef = erlang:monitor(process, Pid),
{noreply, State};
-handle_cast({push_drain_ratio, Pid, DrainRatio}, State) ->
- gen_server2:cast(Pid, {set_bufsec_limit, State#state.drain_avg}),
- {noreply, State#state{drain_dict =
- dict:store(Pid, DrainRatio, State#state.drain_dict)}};
+handle_cast({push_queue_duration, Pid, DrainRatio}, State) ->
+ gen_server2:cast(Pid, {set_bufsec_limit, State#state.queue_duration_avg}),
+ {noreply, State#state{queue_duration_dict =
+ dict:store(Pid, DrainRatio, State#state.queue_duration_dict)}};
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
- {noreply, State#state{drain_dict = dict:erase(Pid, State#state.drain_dict)}};
+ {noreply, State#state{queue_duration_dict =
+ dict:erase(Pid, State#state.queue_duration_dict)}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -203,18 +208,18 @@ count_average(List) ->
end.
internal_update(State) ->
- %% used memory / available memory
- MemoryOvercommit = erlang:memory(total) / State#state.memory_limit,
-
+ %% available memory / used memory
+ UsedMemory = erlang:memory(total),
+ MemoryOvercommit = State#state.memory_limit / UsedMemory,
RealDrainAvg = count_average([V || {_K, V} <-
- dict:to_list(State#state.drain_dict)]),
+ dict:to_list(State#state.queue_duration_dict)]),
%% In case of no active queues, feel free to grow. We can't make any
%% decisionswe have no clue what is the average ram_usage/second.
%% Not does the queue.
DesiredDrainAvg = case RealDrainAvg of
infinity -> infinity;
0.0 -> infinity;
- _ -> RealDrainAvg / MemoryOvercommit
+ _ -> RealDrainAvg * MemoryOvercommit
end,
?LOGDEBUG("DrainAvg Real/Desired:~s/~s MemoryOvercommit:~s~n",
[ftoa(RealDrainAvg), ftoa(DesiredDrainAvg),
@@ -228,7 +233,7 @@ internal_update(State) ->
_ -> ok
end
end,
- lists:map(ReduceMemory, dict:to_list(State#state.drain_dict)),
- State#state{drain_avg = DesiredDrainAvg}.
+ lists:map(ReduceMemory, dict:to_list(State#state.queue_duration_dict)),
+ State#state{queue_duration_avg = DesiredDrainAvg}.