diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-03 14:04:55 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-03 14:04:55 +0100 |
commit | be327931a5ef997ff18c4651072a238de303d41d (patch) | |
tree | 8eadfe4bdaca6ae49ab5e3b8e004aebac69864ef | |
parent | e97c983c251463fa96e4adfde51f62bba6e611c3 (diff) | |
download | rabbitmq-server-be327931a5ef997ff18c4651072a238de303d41d.tar.gz |
All done.
Introduced drain explicitly because to do otherwise would have made life even harder. Everything addressed as per bug and IM. Test once for functions being exported and cache
-rw-r--r-- | src/gen_server2.erl | 145 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 |
2 files changed, 82 insertions, 70 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index be2c5730..63b1d908 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -345,7 +345,8 @@ enter_loop(Mod, Options, State) -> enter_loop(Mod, Options, State, self(), infinity, undefined). enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> - enter_loop(Mod, Options, State, self(), infinity, Backoff); + Backoff1 = extend_backoff(Mod, Backoff), + enter_loop(Mod, Options, State, self(), infinity, Backoff1); enter_loop(Mod, Options, State, ServerName = {_, _}) -> enter_loop(Mod, Options, State, ServerName, infinity, undefined); @@ -354,7 +355,8 @@ enter_loop(Mod, Options, State, Timeout) -> enter_loop(Mod, Options, State, self(), Timeout, undefined). enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity, Backoff); + Backoff1 = extend_backoff(Mod, Backoff), + enter_loop(Mod, Options, State, ServerName, infinity, Backoff1); enter_loop(Mod, Options, State, ServerName, Timeout) -> enter_loop(Mod, Options, State, ServerName, Timeout, undefined). @@ -392,7 +394,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug); + Backoff1 = extend_backoff(Mod, Backoff), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before @@ -430,6 +433,11 @@ unregister_name({global,Name}) -> unregister_name(Pid) when is_pid(Pid) -> Pid. +extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) -> + Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1), + Post = erlang:function_exported(Mod, handle_post_hibernate, 1), + {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}. + %%%======================================================================== %%% Internal functions %%%======================================================================== @@ -440,24 +448,25 @@ loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> proc_lib:hibernate(?MODULE,wake_hib, [Parent, Name, State, Mod, undefined, Queue, Debug]); loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, + drain(Queue), Debug). + +drain(Queue) -> receive - Input -> loop(Parent, Name, State, Mod, - Time, TimeoutState, in(Input, Queue), Debug) - after 0 -> - process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, - Queue, Debug, false) + Input -> drain(in(Input,Queue)) + after 0 -> Queue end. -process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, Hib) -> +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Hib, Msg); + Time, TimeoutState, Queue1, Debug, Msg); {empty, Queue1} -> {Time1, HibOnTimeout} = case {Time, TimeoutState} of - {hibernate, {backoff, Current, _Min, _Desired}} -> + {hibernate, + {backoff, Current, _Min, _Desired, _Pre, _Post}} -> {Current, true}; _ -> {Time, false} end, @@ -474,72 +483,82 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, false -> process_msg( Parent, Name, State, Mod, Time, TimeoutState, - Queue1, Debug, Hib, timeout) + Queue1, Debug, timeout) end end end. wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> - Msg = receive - Input -> - Input - end, process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState, - in(Msg, Queue), Debug, true). + drain(Queue), Debug). wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) -> - AwokeAt = now(), - Msg = receive - Input -> - Input - end, - backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, - TimeoutState, in(Msg, Queue), Debug). - -backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> - case catch Mod:handle_pre_hibernate(State) of - {hibernate, NState} -> - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, Mod, - now(), TimeoutState, - Queue, Debug]); - {stop, Reason, NState} -> - terminate(Reason, Name, pre_hibernate, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, pre_hibernate, Mod, State, []); - Reply -> - terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod, - State, []) + backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(), + TimeoutState, drain(Queue), Debug). + +backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState = + {backoff, _Current, _Minimum, _Desired, Pre, _Post}, + Queue, Debug) -> + case Pre of + true -> + case catch Mod:handle_pre_hibernate(State) of + {hibernate, NState} -> + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, + Mod, now(), + TimeoutState, Queue, + Debug]); + {stop, Reason, NState} -> + terminate(Reason, Name, pre_hibernate, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, pre_hibernate, Mod, State, []); + Reply -> + terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod, + State, []) + end; + false -> + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, + now(), TimeoutState, Queue, + Debug]) end. backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, - {backoff, CurrentTO, MinimumTO, DesiredHibPeriod}, + {backoff, CurrentTO, MinimumTO, DesiredHibPeriod, + Pre, Post}, Queue, Debug) -> NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), CurrentMicros = CurrentTO * 1000, MinimumMicros = MinimumTO * 1000, DesiredHibMicros = DesiredHibPeriod * 1000, - CurrentTO1 = case (NapLengthMicros + CurrentMicros) > - (MinimumMicros + DesiredHibMicros) of - true -> - lists:max([MinimumTO, round(CurrentTO/2)]); - false -> - CurrentTO + MinimumTO - end, - TimeoutState = {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod}, - case catch Mod:handle_post_hibernate(State) of - {noreply, NState} -> - process_next_msg(Parent, Name, NState, Mod, infinity, TimeoutState, - Queue, Debug, true); - {noreply, NState, Time} -> - process_next_msg(Parent, Name, NState, Mod, Time, TimeoutState, - Queue, Debug, true); - {stop, Reason, NState} -> - terminate(Reason, Name, post_hibernate, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, post_hibernate, Mod, State, []); - Reply -> - terminate({bad_return_value, Reply}, Name, post_hibernate, Mod, - State, []) + GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, + Base = + %% If enough time has passed between the last two messages then we + %% should consider sleeping sooner. Otherwise stay awake longer. + case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of + true -> lists:max([MinimumTO, CurrentTO div 2]); + false -> CurrentTO + end, + CurrentTO1 = Base + random:uniform(Base), + TimeoutState = + {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post}, + case Post of + true -> + case catch Mod:handle_post_hibernate(State) of + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, TimeoutState, + Queue, Debug); + {noreply, NState, Time} -> + loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue, + Debug); + {stop, Reason, NState} -> + terminate(Reason, Name, post_hibernate, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, post_hibernate, Mod, State, []); + Reply -> + terminate({bad_return_value, Reply}, Name, post_hibernate, + Mod, State, []) + end; + false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue, + Debug) end. in({'$gen_pcast', {Priority, Msg}}, Queue) -> @@ -550,7 +569,7 @@ in(Input, Queue) -> priority_queue:in(Input, Queue). process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, _Hib, Msg) -> + Debug, Msg) -> case Msg of {system, From, Req} -> sys:handle_system_msg diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6a30503e..fe2e8509 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,6 @@ -export([start_link/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([handle_pre_hibernate/1, handle_post_hibernate/1]). -import(queue). -import(erlang). @@ -819,9 +818,3 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. - -handle_pre_hibernate(State) -> - {hibernate, State}. - -handle_post_hibernate(State) -> - {noreply, State, hibernate}. |