diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-06 12:49:25 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-06 12:49:25 +0100 |
commit | 98f2a335b6da25baa3f23bb2404b3809301477a5 (patch) | |
tree | c7523cb2de831cea3028993137f0041c2bb4dbbf | |
parent | 28bdcd202ee146abb25fca9fea5b640aebb2455f (diff) | |
download | rabbitmq-server-98f2a335b6da25baa3f23bb2404b3809301477a5.tar.gz |
All done.
-rw-r--r-- | src/gen_server2.erl | 146 |
1 files changed, 80 insertions, 66 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 529ed029..4b610506 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -345,8 +345,7 @@ enter_loop(Mod, Options, State) -> enter_loop(Mod, Options, State, self(), infinity, undefined). enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> - Backoff1 = extend_backoff(Mod, Backoff), - enter_loop(Mod, Options, State, self(), infinity, Backoff1); + enter_loop(Mod, Options, State, self(), infinity, Backoff); enter_loop(Mod, Options, State, ServerName = {_, _}) -> enter_loop(Mod, Options, State, ServerName, infinity, undefined); @@ -355,8 +354,7 @@ enter_loop(Mod, Options, State, Timeout) -> enter_loop(Mod, Options, State, self(), Timeout, undefined). enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> - Backoff1 = extend_backoff(Mod, Backoff), - enter_loop(Mod, Options, State, ServerName, infinity, Backoff1); + enter_loop(Mod, Options, State, ServerName, infinity, Backoff); enter_loop(Mod, Options, State, ServerName, Timeout) -> enter_loop(Mod, Options, State, ServerName, Timeout, undefined). @@ -366,7 +364,8 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug). + Backoff1 = extend_backoff(Backoff), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -393,8 +392,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> proc_lib:init_ack(Starter, {ok, self()}), loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + Backoff1 = extend_backoff(Backoff), proc_lib:init_ack(Starter, {ok, self()}), - Backoff1 = extend_backoff(Mod, Backoff), loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); {stop, Reason} -> %% For consistency, we must make sure that the @@ -433,11 +432,10 @@ unregister_name({global,Name}) -> unregister_name(Pid) when is_pid(Pid) -> Pid. -extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) -> - Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1), - Post = erlang:function_exported(Mod, handle_post_hibernate, 1), - random:seed(now()), %% call before we get into the loop - {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}. +extend_backoff(undefined) -> + undefined; +extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> + {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}. %%%======================================================================== %%% Internal functions @@ -446,8 +444,7 @@ extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) -> %%% The MAIN loop. %%% --------------------------------------------------- loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> - proc_lib:hibernate(?MODULE,wake_hib, - [Parent, Name, State, Mod, undefined, Queue, Debug]); + pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, drain(Queue), Debug). @@ -466,29 +463,29 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> {empty, Queue1} -> {Time1, HibOnTimeout} = case {Time, TimeoutState} of - {hibernate, - {backoff, Current, _Min, _Desired, _Pre, _Post}} -> + {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> {Current, true}; {hibernate, _} -> %% wake_hib/7 will set Time to hibernate. If %% we were woken and didn't receive a msg %% then we will get here and need a sensible %% value for Time1, otherwise we crash. - %% On the grounds that it's better to get - %% control back to the user module sooner - %% rather than later, 0 is more sensible - %% than infinity here. - {0, false}; + %% R13B1 always waits infinitely when waking + %% from hibernation, so that's what we do + %% here too. + {infinity, false}; _ -> {Time, false} end, receive Input -> - loop(Parent, Name, State, Mod, - Time, TimeoutState, in(Input, Queue1), Debug) + %% Time could be 'hibernate' here, so *don't* call loop + process_next_msg( + Parent, Name, State, Mod, Time, TimeoutState, + drain(in(Input, Queue1)), Debug) after Time1 -> case HibOnTimeout of true -> - backoff_pre_hibernate( + pre_hibernate( Parent, Name, State, Mod, TimeoutState, Queue1, Debug); false -> @@ -499,25 +496,33 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> end end. -wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> - process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState, - drain(Queue), Debug). +wake_hib(Parent, Name, State, Mod, undefined, Queue, Debug) -> + post_hibernate(Parent, Name, State, Mod, undefined, undefined, undefined, + drain(Queue), Debug). wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) -> - backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(), - TimeoutState, drain(Queue), Debug). - -backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState = - {backoff, _Current, _Minimum, _Desired, Pre, _Post}, - Queue, Debug) -> - case Pre of + WokenAt = now(), %% enforce this gets called before drain/1 + post_hibernate(Parent, Name, State, Mod, SleptAt, WokenAt, + TimeoutState, drain(Queue), Debug). + +pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + ArgsFun = + case TimeoutState of + undefined -> + fun (NState) -> + [Parent, Name, NState, Mod, TimeoutState, Queue, Debug] + end; + {backoff, _, _, _, _} -> + fun (NState) -> + [Parent, Name, NState, Mod, now(), TimeoutState, Queue, + Debug] + end + end, + case erlang:function_exported(Mod, handle_pre_hibernate, 1) of true -> case catch Mod:handle_pre_hibernate(State) of {hibernate, NState} -> - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, - Mod, now(), - TimeoutState, Queue, - Debug]); + proc_lib:hibernate(?MODULE, wake_hib, ArgsFun(NState)); {stop, Reason, NState} -> terminate(Reason, Name, pre_hibernate, Mod, NState, []); {'EXIT', What} -> @@ -527,39 +532,21 @@ backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState = State, []) end; false -> - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, - now(), TimeoutState, Queue, - Debug]) + proc_lib:hibernate(?MODULE, wake_hib, ArgsFun(State)) end. -backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, - {backoff, CurrentTO, MinimumTO, DesiredHibPeriod, - Pre, Post}, - Queue, Debug) -> - NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), - CurrentMicros = CurrentTO * 1000, - MinimumMicros = MinimumTO * 1000, - DesiredHibMicros = DesiredHibPeriod * 1000, - GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, - Base = - %% If enough time has passed between the last two messages then we - %% should consider sleeping sooner. Otherwise stay awake longer. - case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of - true -> lists:max([MinimumTO, CurrentTO div 2]); - false -> CurrentTO - end, - CurrentTO1 = Base + random:uniform(Base), - TimeoutState = - {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post}, - case Post of +post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, TimeoutState, Queue, + Debug) -> + TimeoutState1 = adjust_timeout_state(SleptAt, AwokeAt, TimeoutState), + case erlang:function_exported(Mod, handle_post_hibernate, 1) of true -> case catch Mod:handle_post_hibernate(State) of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, - Queue, Debug); + process_next_msg(Parent, Name, NState, Mod, infinity, + TimeoutState1, Queue, Debug); {noreply, NState, Time} -> - loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue, - Debug); + process_next_msg(Parent, Name, NState, Mod, Time, + TimeoutState1, Queue, Debug); {stop, Reason, NState} -> terminate(Reason, Name, post_hibernate, Mod, NState, []); {'EXIT', What} -> @@ -568,10 +555,37 @@ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, terminate({bad_return_value, Reply}, Name, post_hibernate, Mod, State, []) end; - false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue, - Debug) + false -> + %% use hibernate here, not infinity. This matches + %% R13B. The key is that we should be able to get through + %% to process_msg calling sys:handle_system_msg with Time + %% still set to hibernate, iff that msg is the very msg + %% that woke us up (or the first msg we receive after + %% waking up). + process_next_msg(Parent, Name, State, Mod, hibernate, + TimeoutState1, Queue, Debug) end. +adjust_timeout_state(undefined, undefined, undefined) -> + undefined; +adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, + DesiredHibPeriod, RandomState}) -> + NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), + CurrentMicros = CurrentTO * 1000, + MinimumMicros = MinimumTO * 1000, + DesiredHibMicros = DesiredHibPeriod * 1000, + GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, + Base = + %% If enough time has passed between the last two messages then we + %% should consider sleeping sooner. Otherwise stay awake longer. + case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of + true -> lists:max([MinimumTO, CurrentTO div 2]); + false -> CurrentTO + end, + {Extra, RandomState1} = random:uniform_s(Base, RandomState), + CurrentTO1 = Base + Extra, + {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. + in({'$gen_pcast', {Priority, Msg}}, Queue) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> |