summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-02 18:20:05 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-02 18:20:05 +0100
commit3660c5754ad4cda1899f7af346f727d44f3094a9 (patch)
tree45cf69cdadd7bec9cbd160a1f9df626ae32677a0
parent1ab2a563106ede804eb0968e2eabb2df8980cfd0 (diff)
downloadrabbitmq-server-3660c5754ad4cda1899f7af346f727d44f3094a9.tar.gz
wip, dnc.
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_queue_mode_manager.erl108
2 files changed, 89 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bac7cfb5..ebee301e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -595,14 +595,14 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
-report_memory(State = #q { mixed_state = MS }) ->
+report_memory(Hibernating, State = #q { mixed_state = MS }) ->
{MSize, Gain, Loss} =
rabbit_mixed_queue:estimate_queue_memory(MS),
NewMem = case MSize of
0 -> 1; %% avoid / 0
N -> N
end,
- rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss),
+ rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss, Hibernating),
State #q { mixed_state = rabbit_mixed_queue:reset_counters(MS),
memory_report_timer = undefined }.
@@ -881,7 +881,7 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) ->
noreply(State #q { mixed_state = MS1 });
handle_cast(report_memory, State) ->
- noreply1(report_memory(State)).
+ noreply1(report_memory(false, State)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -902,7 +902,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- State1 = (stop_memory_timer(report_memory(State)))
+ State1 = (stop_memory_timer(report_memory(true, State)))
#q { hibernated_at = now() },
proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 3a55833b..39524978 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([register/1, report_memory/4]).
+-export([register/1, report_memory/5]).
-define(TOTAL_TOKENS, 1000).
-define(LOW_WATER_MARK_FRACTION, 0.25).
@@ -54,15 +54,18 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
-spec(register/1 :: (pid()) -> {'ok', queue_mode()}).
--spec(report_memory/4 :: (pid(), non_neg_integer(),
- non_neg_integer(), non_neg_integer()) -> 'ok').
+-spec(report_memory/5 :: (pid(), non_neg_integer(),
+ non_neg_integer(), non_neg_integer(), bool()) ->
+ 'ok').
-endif.
-record(state, { available_tokens,
available_etokens,
mixed_queues,
- tokens_per_byte
+ tokens_per_byte,
+ low_rate,
+ hibernated
}).
start_link() ->
@@ -71,8 +74,9 @@ start_link() ->
register(Pid) ->
gen_server2:call(?SERVER, {register, Pid}).
-report_memory(Pid, Memory, Gain, Loss) ->
- gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Gain, Loss}).
+report_memory(Pid, Memory, Gain, Loss, Hibernating) ->
+ gen_server2:cast(?SERVER,
+ {report_memory, Pid, Memory, Gain, Loss, Hibernating}).
init([]) ->
process_flag(trap_exit, true),
@@ -84,7 +88,9 @@ init([]) ->
{ok, #state { available_tokens = Avail,
available_etokens = EAvail,
mixed_queues = dict:new(),
- tokens_per_byte = ?TOTAL_TOKENS / MemAvail
+ tokens_per_byte = ?TOTAL_TOKENS / MemAvail,
+ low_rate = sets:new(),
+ hibernated = sets:new()
}}.
handle_call({register, Pid}, _From,
@@ -101,7 +107,7 @@ handle_call({register, Pid}, _From,
end,
{reply, {ok, Result}, State1}.
-handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost},
+handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
State = #state { available_tokens = Avail,
available_etokens = EAvail,
tokens_per_byte = TPB,
@@ -137,11 +143,11 @@ handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost},
true -> %% getting bigger
case Req > Avail1 of
true -> %% go to disk
- rabbit_amqqueue:set_mode(Pid, disk),
- State #state { available_tokens = Avail1,
- available_etokens = EAvail1,
- mixed_queues =
- dict:erase(Pid, Mixed) };
+ attempt_free_from_idle(Req, Pid,
+ State #state { available_tokens = Avail1,
+ available_etokens = EAvail1,
+ mixed_queues =
+ dict:erase(Pid, Mixed) });
false -> %% request not too big, stay mixed
State #state { available_tokens = Avail1 - Req,
available_etokens = EAvail1,
@@ -153,11 +159,11 @@ handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost},
true ->
case Req > Avail1 orelse LowRate of
true -> %% go to disk
- rabbit_amqqueue:set_mode(Pid, disk),
- State #state { available_tokens = Avail1,
- available_etokens = EAvail1,
- mixed_queues =
- dict:erase(Pid, Mixed) };
+ attempt_free_from_idle(Req, Pid,
+ State #state { available_tokens = Avail1,
+ available_etokens = EAvail1,
+ mixed_queues =
+ dict:erase(Pid, Mixed) });
false -> %% request not too big, stay mixed
State #state { available_tokens = Avail1 - Req,
available_etokens = EAvail1,
@@ -170,11 +176,11 @@ handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost},
EReq = Req - Avail1,
case EReq > EAvail1 of
true -> %% go to disk
- rabbit_amqqueue:set_mode(Pid, disk),
- State #state { available_tokens = Avail1,
- available_etokens = EAvail1,
- mixed_queues =
- dict:erase(Pid, Mixed) };
+ attempt_free_from_idle(Req, Pid,
+ State #state { available_tokens = Avail1,
+ available_etokens = EAvail1,
+ mixed_queues =
+ dict:erase(Pid, Mixed) });
false -> %% request not too big, stay mixed
State #state { available_tokens = 0,
available_etokens = EAvail1 - EReq,
@@ -226,3 +232,59 @@ ceil(N) when N - trunc(N) > 0 ->
1 + trunc(N);
ceil(N) ->
N.
+
+attempt_free_from_idle(Req, Pid, State = #state { available_tokens = Avail,
+ available_etokens = EAvail,
+ low_rate = Lazy,
+ hibernated = Sleepy,
+ mixed_queues = Mixed }) ->
+ case Req > Avail of
+ true ->
+ {Sleepy1, Freed, EFreed, State1} = free_upto(Req, sets:to_list(Sleepy), State),
+ case Req > Avail + Freed of
+ true ->
+ {Lazy1, Freed1, EFreed1, State2} = free_upto(Req, sets:to_list(Lazy), State1),
+ case Req > Avail + Freed + Freed1 of
+ true ->
+ rabbit_amqqueue:set_mode(Pid, disk),
+ State2 #state { available_tokens = Avail + Freed + Freed1,
+ available_etokens = EAvail + EFreed + EFreed1,
+ low_rate = Lazy1,
+ hibernated = Sleepy1,
+ mixed_queues = dict:erase(Pid, Mixed)
+ };
+ false ->
+ State2 #state { available_tokens = Avail + Freed + Freed1 - Req,
+ available_etokens = EAvail + EFreed + EFreed1,
+ low_rate = Lazy1,
+ hibernated = Sleepy1,
+ mixed_queues = dict:store(Pid, {Req, 0}, Mixed)
+ }
+ end;
+ false ->
+ State1 #state { available_tokens = Avail + Freed - Req,
+ available_etokens = EAvail + EFreed,
+ hibernated = Sleepy1,
+ mixed_queues = dict:store(Pid, {Req, 0}, Mixed)
+ }
+ end;
+ false ->
+ State #state { mixed_queues = dict:store(Pid, {Req, 0}, Mixed) }
+ end.
+
+free_upto(Req, List, State) ->
+ free_upto(Req, List, 0, 0, State).
+
+free_upto(_Req, [], Freed, EFreed, State) ->
+ {[], Freed, EFreed, State};
+free_upto(Req, [Pid|Pids], Freed, EFreed, State = #state { available_tokens = Avail,
+ mixed_queues = Mixed }) ->
+ {mixed, {Alloc, EAlloc}} = find_queue(Pid, State),
+ rabbit_amqqueue:set_mode(Pid, disk),
+ State1 = State #state { mixed_queues = dict:erase(Pid, Mixed) },
+ case Req > Avail + Freed + Alloc of
+ true ->
+ free_upto(Req, Pids, Freed + Alloc, EFreed + EAlloc, State1);
+ false ->
+ {Pids, Freed + Alloc, EFreed + EAlloc, State1}
+ end.