summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-03 14:04:55 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-03 14:04:55 +0100
commitbe327931a5ef997ff18c4651072a238de303d41d (patch)
tree8eadfe4bdaca6ae49ab5e3b8e004aebac69864ef
parente97c983c251463fa96e4adfde51f62bba6e611c3 (diff)
downloadrabbitmq-server-be327931a5ef997ff18c4651072a238de303d41d.tar.gz
All done.
Introduced drain explicitly because to do otherwise would have made life even harder. Everything addressed as per bug and IM. Test once for functions being exported and cache
-rw-r--r--src/gen_server2.erl145
-rw-r--r--src/rabbit_amqqueue_process.erl7
2 files changed, 82 insertions, 70 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index be2c5730..63b1d908 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -345,7 +345,8 @@ enter_loop(Mod, Options, State) ->
enter_loop(Mod, Options, State, self(), infinity, undefined).
enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
- enter_loop(Mod, Options, State, self(), infinity, Backoff);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ enter_loop(Mod, Options, State, self(), infinity, Backoff1);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
enter_loop(Mod, Options, State, ServerName, infinity, undefined);
@@ -354,7 +355,8 @@ enter_loop(Mod, Options, State, Timeout) ->
enter_loop(Mod, Options, State, self(), Timeout, undefined).
enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
- enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff1);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
@@ -392,7 +394,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
{ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -430,6 +433,11 @@ unregister_name({global,Name}) ->
unregister_name(Pid) when is_pid(Pid) ->
Pid.
+extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) ->
+ Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1),
+ Post = erlang:function_exported(Mod, handle_post_hibernate, 1),
+ {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}.
+
%%%========================================================================
%%% Internal functions
%%%========================================================================
@@ -440,24 +448,25 @@ loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
proc_lib:hibernate(?MODULE,wake_hib,
[Parent, Name, State, Mod, undefined, Queue, Debug]);
loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
+ drain(Queue), Debug).
+
+drain(Queue) ->
receive
- Input -> loop(Parent, Name, State, Mod,
- Time, TimeoutState, in(Input, Queue), Debug)
- after 0 ->
- process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
- Queue, Debug, false)
+ Input -> drain(in(Input,Queue))
+ after 0 -> Queue
end.
-process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, Hib) ->
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Hib, Msg);
+ Time, TimeoutState, Queue1, Debug, Msg);
{empty, Queue1} ->
{Time1, HibOnTimeout}
= case {Time, TimeoutState} of
- {hibernate, {backoff, Current, _Min, _Desired}} ->
+ {hibernate,
+ {backoff, Current, _Min, _Desired, _Pre, _Post}} ->
{Current, true};
_ -> {Time, false}
end,
@@ -474,72 +483,82 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
false ->
process_msg(
Parent, Name, State, Mod, Time, TimeoutState,
- Queue1, Debug, Hib, timeout)
+ Queue1, Debug, timeout)
end
end
end.
wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
- Msg = receive
- Input ->
- Input
- end,
process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState,
- in(Msg, Queue), Debug, true).
+ drain(Queue), Debug).
wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) ->
- AwokeAt = now(),
- Msg = receive
- Input ->
- Input
- end,
- backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
- TimeoutState, in(Msg, Queue), Debug).
-
-backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
- case catch Mod:handle_pre_hibernate(State) of
- {hibernate, NState} ->
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, Mod,
- now(), TimeoutState,
- Queue, Debug]);
- {stop, Reason, NState} ->
- terminate(Reason, Name, pre_hibernate, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, pre_hibernate, Mod, State, []);
- Reply ->
- terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
- State, [])
+ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(),
+ TimeoutState, drain(Queue), Debug).
+
+backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState =
+ {backoff, _Current, _Minimum, _Desired, Pre, _Post},
+ Queue, Debug) ->
+ case Pre of
+ true ->
+ case catch Mod:handle_pre_hibernate(State) of
+ {hibernate, NState} ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState,
+ Mod, now(),
+ TimeoutState, Queue,
+ Debug]);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, pre_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, pre_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
+ State, [])
+ end;
+ false ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
+ now(), TimeoutState, Queue,
+ Debug])
end.
backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
- {backoff, CurrentTO, MinimumTO, DesiredHibPeriod},
+ {backoff, CurrentTO, MinimumTO, DesiredHibPeriod,
+ Pre, Post},
Queue, Debug) ->
NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
CurrentMicros = CurrentTO * 1000,
MinimumMicros = MinimumTO * 1000,
DesiredHibMicros = DesiredHibPeriod * 1000,
- CurrentTO1 = case (NapLengthMicros + CurrentMicros) >
- (MinimumMicros + DesiredHibMicros) of
- true ->
- lists:max([MinimumTO, round(CurrentTO/2)]);
- false ->
- CurrentTO + MinimumTO
- end,
- TimeoutState = {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod},
- case catch Mod:handle_post_hibernate(State) of
- {noreply, NState} ->
- process_next_msg(Parent, Name, NState, Mod, infinity, TimeoutState,
- Queue, Debug, true);
- {noreply, NState, Time} ->
- process_next_msg(Parent, Name, NState, Mod, Time, TimeoutState,
- Queue, Debug, true);
- {stop, Reason, NState} ->
- terminate(Reason, Name, post_hibernate, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, post_hibernate, Mod, State, []);
- Reply ->
- terminate({bad_return_value, Reply}, Name, post_hibernate, Mod,
- State, [])
+ GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
+ Base =
+ %% If enough time has passed between the last two messages then we
+ %% should consider sleeping sooner. Otherwise stay awake longer.
+ case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
+ true -> lists:max([MinimumTO, CurrentTO div 2]);
+ false -> CurrentTO
+ end,
+ CurrentTO1 = Base + random:uniform(Base),
+ TimeoutState =
+ {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post},
+ case Post of
+ true ->
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState,
+ Queue, Debug);
+ {noreply, NState, Time} ->
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue,
+ Debug);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, post_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, post_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, post_hibernate,
+ Mod, State, [])
+ end;
+ false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue,
+ Debug)
end.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
@@ -550,7 +569,7 @@ in(Input, Queue) ->
priority_queue:in(Input, Queue).
process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, _Hib, Msg) ->
+ Debug, Msg) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6a30503e..fe2e8509 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,6 @@
-export([start_link/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--export([handle_pre_hibernate/1, handle_post_hibernate/1]).
-import(queue).
-import(erlang).
@@ -819,9 +818,3 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
-
-handle_pre_hibernate(State) ->
- {hibernate, State}.
-
-handle_post_hibernate(State) ->
- {noreply, State, hibernate}.