diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-07-02 18:20:05 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-07-02 18:20:05 +0100 |
commit | 3660c5754ad4cda1899f7af346f727d44f3094a9 (patch) | |
tree | 45cf69cdadd7bec9cbd160a1f9df626ae32677a0 | |
parent | 1ab2a563106ede804eb0968e2eabb2df8980cfd0 (diff) | |
download | rabbitmq-server-3660c5754ad4cda1899f7af346f727d44f3094a9.tar.gz |
wip, dnc.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_queue_mode_manager.erl | 108 |
2 files changed, 89 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index bac7cfb5..ebee301e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -595,14 +595,14 @@ i(memory, _) -> i(Item, _) -> throw({bad_argument, Item}). -report_memory(State = #q { mixed_state = MS }) -> +report_memory(Hibernating, State = #q { mixed_state = MS }) -> {MSize, Gain, Loss} = rabbit_mixed_queue:estimate_queue_memory(MS), NewMem = case MSize of 0 -> 1; %% avoid / 0 N -> N end, - rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss), + rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss, Hibernating), State #q { mixed_state = rabbit_mixed_queue:reset_counters(MS), memory_report_timer = undefined }. @@ -881,7 +881,7 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> noreply(State #q { mixed_state = MS1 }); handle_cast(report_memory, State) -> - noreply1(report_memory(State)). + noreply1(report_memory(false, State)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -902,7 +902,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - State1 = (stop_memory_timer(report_memory(State))) + State1 = (stop_memory_timer(report_memory(true, State))) #q { hibernated_at = now() }, proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 3a55833b..39524978 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/1, report_memory/4]). +-export([register/1, report_memory/5]). -define(TOTAL_TOKENS, 1000). -define(LOW_WATER_MARK_FRACTION, 0.25). @@ -54,15 +54,18 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). -spec(register/1 :: (pid()) -> {'ok', queue_mode()}). --spec(report_memory/4 :: (pid(), non_neg_integer(), - non_neg_integer(), non_neg_integer()) -> 'ok'). +-spec(report_memory/5 :: (pid(), non_neg_integer(), + non_neg_integer(), non_neg_integer(), bool()) -> + 'ok'). -endif. -record(state, { available_tokens, available_etokens, mixed_queues, - tokens_per_byte + tokens_per_byte, + low_rate, + hibernated }). start_link() -> @@ -71,8 +74,9 @@ start_link() -> register(Pid) -> gen_server2:call(?SERVER, {register, Pid}). -report_memory(Pid, Memory, Gain, Loss) -> - gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Gain, Loss}). +report_memory(Pid, Memory, Gain, Loss, Hibernating) -> + gen_server2:cast(?SERVER, + {report_memory, Pid, Memory, Gain, Loss, Hibernating}). init([]) -> process_flag(trap_exit, true), @@ -84,7 +88,9 @@ init([]) -> {ok, #state { available_tokens = Avail, available_etokens = EAvail, mixed_queues = dict:new(), - tokens_per_byte = ?TOTAL_TOKENS / MemAvail + tokens_per_byte = ?TOTAL_TOKENS / MemAvail, + low_rate = sets:new(), + hibernated = sets:new() }}. handle_call({register, Pid}, _From, @@ -101,7 +107,7 @@ handle_call({register, Pid}, _From, end, {reply, {ok, Result}, State1}. -handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost}, +handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, State = #state { available_tokens = Avail, available_etokens = EAvail, tokens_per_byte = TPB, @@ -137,11 +143,11 @@ handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost}, true -> %% getting bigger case Req > Avail1 of true -> %% go to disk - rabbit_amqqueue:set_mode(Pid, disk), - State #state { available_tokens = Avail1, - available_etokens = EAvail1, - mixed_queues = - dict:erase(Pid, Mixed) }; + attempt_free_from_idle(Req, Pid, + State #state { available_tokens = Avail1, + available_etokens = EAvail1, + mixed_queues = + dict:erase(Pid, Mixed) }); false -> %% request not too big, stay mixed State #state { available_tokens = Avail1 - Req, available_etokens = EAvail1, @@ -153,11 +159,11 @@ handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost}, true -> case Req > Avail1 orelse LowRate of true -> %% go to disk - rabbit_amqqueue:set_mode(Pid, disk), - State #state { available_tokens = Avail1, - available_etokens = EAvail1, - mixed_queues = - dict:erase(Pid, Mixed) }; + attempt_free_from_idle(Req, Pid, + State #state { available_tokens = Avail1, + available_etokens = EAvail1, + mixed_queues = + dict:erase(Pid, Mixed) }); false -> %% request not too big, stay mixed State #state { available_tokens = Avail1 - Req, available_etokens = EAvail1, @@ -170,11 +176,11 @@ handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost}, EReq = Req - Avail1, case EReq > EAvail1 of true -> %% go to disk - rabbit_amqqueue:set_mode(Pid, disk), - State #state { available_tokens = Avail1, - available_etokens = EAvail1, - mixed_queues = - dict:erase(Pid, Mixed) }; + attempt_free_from_idle(Req, Pid, + State #state { available_tokens = Avail1, + available_etokens = EAvail1, + mixed_queues = + dict:erase(Pid, Mixed) }); false -> %% request not too big, stay mixed State #state { available_tokens = 0, available_etokens = EAvail1 - EReq, @@ -226,3 +232,59 @@ ceil(N) when N - trunc(N) > 0 -> 1 + trunc(N); ceil(N) -> N. + +attempt_free_from_idle(Req, Pid, State = #state { available_tokens = Avail, + available_etokens = EAvail, + low_rate = Lazy, + hibernated = Sleepy, + mixed_queues = Mixed }) -> + case Req > Avail of + true -> + {Sleepy1, Freed, EFreed, State1} = free_upto(Req, sets:to_list(Sleepy), State), + case Req > Avail + Freed of + true -> + {Lazy1, Freed1, EFreed1, State2} = free_upto(Req, sets:to_list(Lazy), State1), + case Req > Avail + Freed + Freed1 of + true -> + rabbit_amqqueue:set_mode(Pid, disk), + State2 #state { available_tokens = Avail + Freed + Freed1, + available_etokens = EAvail + EFreed + EFreed1, + low_rate = Lazy1, + hibernated = Sleepy1, + mixed_queues = dict:erase(Pid, Mixed) + }; + false -> + State2 #state { available_tokens = Avail + Freed + Freed1 - Req, + available_etokens = EAvail + EFreed + EFreed1, + low_rate = Lazy1, + hibernated = Sleepy1, + mixed_queues = dict:store(Pid, {Req, 0}, Mixed) + } + end; + false -> + State1 #state { available_tokens = Avail + Freed - Req, + available_etokens = EAvail + EFreed, + hibernated = Sleepy1, + mixed_queues = dict:store(Pid, {Req, 0}, Mixed) + } + end; + false -> + State #state { mixed_queues = dict:store(Pid, {Req, 0}, Mixed) } + end. + +free_upto(Req, List, State) -> + free_upto(Req, List, 0, 0, State). + +free_upto(_Req, [], Freed, EFreed, State) -> + {[], Freed, EFreed, State}; +free_upto(Req, [Pid|Pids], Freed, EFreed, State = #state { available_tokens = Avail, + mixed_queues = Mixed }) -> + {mixed, {Alloc, EAlloc}} = find_queue(Pid, State), + rabbit_amqqueue:set_mode(Pid, disk), + State1 = State #state { mixed_queues = dict:erase(Pid, Mixed) }, + case Req > Avail + Freed + Alloc of + true -> + free_upto(Req, Pids, Freed + Alloc, EFreed + EAlloc, State1); + false -> + {Pids, Freed + Alloc, EFreed + EAlloc, State1} + end. |