summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-06 12:49:25 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-06 12:49:25 +0100
commit98f2a335b6da25baa3f23bb2404b3809301477a5 (patch)
treec7523cb2de831cea3028993137f0041c2bb4dbbf
parent28bdcd202ee146abb25fca9fea5b640aebb2455f (diff)
downloadrabbitmq-server-98f2a335b6da25baa3f23bb2404b3809301477a5.tar.gz
All done.
-rw-r--r--src/gen_server2.erl146
1 files changed, 80 insertions, 66 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 529ed029..4b610506 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -345,8 +345,7 @@ enter_loop(Mod, Options, State) ->
enter_loop(Mod, Options, State, self(), infinity, undefined).
enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
- Backoff1 = extend_backoff(Mod, Backoff),
- enter_loop(Mod, Options, State, self(), infinity, Backoff1);
+ enter_loop(Mod, Options, State, self(), infinity, Backoff);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
enter_loop(Mod, Options, State, ServerName, infinity, undefined);
@@ -355,8 +354,7 @@ enter_loop(Mod, Options, State, Timeout) ->
enter_loop(Mod, Options, State, self(), Timeout, undefined).
enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
- Backoff1 = extend_backoff(Mod, Backoff),
- enter_loop(Mod, Options, State, ServerName, infinity, Backoff1);
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
@@ -366,7 +364,8 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug).
+ Backoff1 = extend_backoff(Backoff),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -393,8 +392,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
{ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ Backoff1 = extend_backoff(Backoff),
proc_lib:init_ack(Starter, {ok, self()}),
- Backoff1 = extend_backoff(Mod, Backoff),
loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
@@ -433,11 +432,10 @@ unregister_name({global,Name}) ->
unregister_name(Pid) when is_pid(Pid) ->
Pid.
-extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) ->
- Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1),
- Post = erlang:function_exported(Mod, handle_post_hibernate, 1),
- random:seed(now()), %% call before we get into the loop
- {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}.
+extend_backoff(undefined) ->
+ undefined;
+extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
+ {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
%%%========================================================================
%%% Internal functions
@@ -446,8 +444,7 @@ extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) ->
%%% The MAIN loop.
%%% ---------------------------------------------------
loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
- proc_lib:hibernate(?MODULE,wake_hib,
- [Parent, Name, State, Mod, undefined, Queue, Debug]);
+ pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug);
loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
drain(Queue), Debug).
@@ -466,29 +463,29 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
{empty, Queue1} ->
{Time1, HibOnTimeout}
= case {Time, TimeoutState} of
- {hibernate,
- {backoff, Current, _Min, _Desired, _Pre, _Post}} ->
+ {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
{Current, true};
{hibernate, _} ->
%% wake_hib/7 will set Time to hibernate. If
%% we were woken and didn't receive a msg
%% then we will get here and need a sensible
%% value for Time1, otherwise we crash.
- %% On the grounds that it's better to get
- %% control back to the user module sooner
- %% rather than later, 0 is more sensible
- %% than infinity here.
- {0, false};
+ %% R13B1 always waits infinitely when waking
+ %% from hibernation, so that's what we do
+ %% here too.
+ {infinity, false};
_ -> {Time, false}
end,
receive
Input ->
- loop(Parent, Name, State, Mod,
- Time, TimeoutState, in(Input, Queue1), Debug)
+ %% Time could be 'hibernate' here, so *don't* call loop
+ process_next_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ drain(in(Input, Queue1)), Debug)
after Time1 ->
case HibOnTimeout of
true ->
- backoff_pre_hibernate(
+ pre_hibernate(
Parent, Name, State, Mod, TimeoutState, Queue1,
Debug);
false ->
@@ -499,25 +496,33 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
end
end.
-wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
- process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState,
- drain(Queue), Debug).
+wake_hib(Parent, Name, State, Mod, undefined, Queue, Debug) ->
+ post_hibernate(Parent, Name, State, Mod, undefined, undefined, undefined,
+ drain(Queue), Debug).
wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) ->
- backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(),
- TimeoutState, drain(Queue), Debug).
-
-backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState =
- {backoff, _Current, _Minimum, _Desired, Pre, _Post},
- Queue, Debug) ->
- case Pre of
+ WokenAt = now(), %% enforce this gets called before drain/1
+ post_hibernate(Parent, Name, State, Mod, SleptAt, WokenAt,
+ TimeoutState, drain(Queue), Debug).
+
+pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ ArgsFun =
+ case TimeoutState of
+ undefined ->
+ fun (NState) ->
+ [Parent, Name, NState, Mod, TimeoutState, Queue, Debug]
+ end;
+ {backoff, _, _, _, _} ->
+ fun (NState) ->
+ [Parent, Name, NState, Mod, now(), TimeoutState, Queue,
+ Debug]
+ end
+ end,
+ case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
true ->
case catch Mod:handle_pre_hibernate(State) of
{hibernate, NState} ->
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState,
- Mod, now(),
- TimeoutState, Queue,
- Debug]);
+ proc_lib:hibernate(?MODULE, wake_hib, ArgsFun(NState));
{stop, Reason, NState} ->
terminate(Reason, Name, pre_hibernate, Mod, NState, []);
{'EXIT', What} ->
@@ -527,39 +532,21 @@ backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState =
State, [])
end;
false ->
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
- now(), TimeoutState, Queue,
- Debug])
+ proc_lib:hibernate(?MODULE, wake_hib, ArgsFun(State))
end.
-backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
- {backoff, CurrentTO, MinimumTO, DesiredHibPeriod,
- Pre, Post},
- Queue, Debug) ->
- NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
- CurrentMicros = CurrentTO * 1000,
- MinimumMicros = MinimumTO * 1000,
- DesiredHibMicros = DesiredHibPeriod * 1000,
- GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
- Base =
- %% If enough time has passed between the last two messages then we
- %% should consider sleeping sooner. Otherwise stay awake longer.
- case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
- true -> lists:max([MinimumTO, CurrentTO div 2]);
- false -> CurrentTO
- end,
- CurrentTO1 = Base + random:uniform(Base),
- TimeoutState =
- {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post},
- case Post of
+post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, TimeoutState, Queue,
+ Debug) ->
+ TimeoutState1 = adjust_timeout_state(SleptAt, AwokeAt, TimeoutState),
+ case erlang:function_exported(Mod, handle_post_hibernate, 1) of
true ->
case catch Mod:handle_post_hibernate(State) of
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState,
- Queue, Debug);
+ process_next_msg(Parent, Name, NState, Mod, infinity,
+ TimeoutState1, Queue, Debug);
{noreply, NState, Time} ->
- loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue,
- Debug);
+ process_next_msg(Parent, Name, NState, Mod, Time,
+ TimeoutState1, Queue, Debug);
{stop, Reason, NState} ->
terminate(Reason, Name, post_hibernate, Mod, NState, []);
{'EXIT', What} ->
@@ -568,10 +555,37 @@ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
terminate({bad_return_value, Reply}, Name, post_hibernate,
Mod, State, [])
end;
- false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue,
- Debug)
+ false ->
+ %% use hibernate here, not infinity. This matches
+ %% R13B. The key is that we should be able to get through
+ %% to process_msg calling sys:handle_system_msg with Time
+ %% still set to hibernate, iff that msg is the very msg
+ %% that woke us up (or the first msg we receive after
+ %% waking up).
+ process_next_msg(Parent, Name, State, Mod, hibernate,
+ TimeoutState1, Queue, Debug)
end.
+adjust_timeout_state(undefined, undefined, undefined) ->
+ undefined;
+adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
+ DesiredHibPeriod, RandomState}) ->
+ NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
+ CurrentMicros = CurrentTO * 1000,
+ MinimumMicros = MinimumTO * 1000,
+ DesiredHibMicros = DesiredHibPeriod * 1000,
+ GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
+ Base =
+ %% If enough time has passed between the last two messages then we
+ %% should consider sleeping sooner. Otherwise stay awake longer.
+ case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
+ true -> lists:max([MinimumTO, CurrentTO div 2]);
+ false -> CurrentTO
+ end,
+ {Extra, RandomState1} = random:uniform_s(Base, RandomState),
+ CurrentTO1 = Base + Extra,
+ {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
+
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->