summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-06 16:39:28 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-06 16:39:28 +0100
commit5c445726499c98c62f1146151fb7109538693acd (patch)
tree90a0522458d424b588d9562cf274fe59d9dff0a1
parent3aa8f53f42e346a986f2505e50bc459daae13703 (diff)
downloadrabbitmq-server-5c445726499c98c62f1146151fb7109538693acd.tar.gz
Pushed the binary exponential timeout / hibernate system into gen_server2. Adjusted amqqueue_process to use it. Added documentation. Tested thoroughly with explicit test module (not added), and full test suite, which all passed. Existing tests further up in this bug similarly pass and demonstrate code is functioning correctly.
-rw-r--r--src/gen_server2.erl162
-rw-r--r--src/rabbit_amqqueue_process.erl47
2 files changed, 117 insertions, 92 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 7ce81f92..dc1e8691 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -1,4 +1,4 @@
-%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP
+%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
%% distribution, with the following modifications:
%%
%% 1) the module name is gen_server2
@@ -21,6 +21,23 @@
%% higher priorities are processed before requests with lower
%% priorities. The default priority is 0.
%%
+%% 5) On return from init/1, the timeout value {binary, Min} creates a
+%% binary exponential timeout, where Min is the minimum number of
+%% milliseconds permitted, and is also used as the current timeout
+%% value. Returning from handle_* with the timeout value set to
+%% 'binary' will use the current binary timeout value. handle_info/2
+%% with the Info of 'timeout' will function normally, and supports the
+%% return value of {noreply, State, hibernate} which will hibernate
+%% the process. The current timeout value is:
+%%
+%% a) doubled if the time spent in hibernation is < 4 * the current value;
+%% b) halved if the time spent in hibernation is > 16 * the current value;
+%% c) maintained in all other cases
+%%
+%% Explicit timeouts (i.e. not 'binary') from the handle_* functions
+%% are still supported, and do not have any effect on the current
+%% timeout value.
+
%% All modifications are (C) 2009 LShift Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
@@ -116,7 +133,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/6]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
-export([behaviour_info/1]).
@@ -316,7 +333,12 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
+ TimeoutState = case Timeout of
+ {binary, Min} ->
+ {Min, Min, undefined};
+ _ -> undefined
+ end,
+ loop(Parent, Name, State, Mod, Timeout, TimeoutState, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -338,10 +360,15 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, Queue, Debug);
+ loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
{ok, State, Timeout} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
+ proc_lib:init_ack(Starter, {ok, self()}),
+ TimeoutState = case Timeout of
+ {binary, Min} ->
+ {Min, Min, undefined};
+ _ -> undefined
+ end,
+ loop(Parent, Name, State, Mod, binary, TimeoutState, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -383,38 +410,66 @@ unregister_name(Pid) when is_pid(Pid) ->
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, hibernate, Queue, Debug) ->
- proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, Queue, Debug]);
-loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
+ proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, undefined, Queue, Debug]);
+loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue, Debug) ->
+ proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, {Current, Min, now()}, Queue, Debug]);
+loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
- Time, in(Input, Queue), Debug)
+ Time, TimeoutState, in(Input, Queue), Debug)
after 0 ->
- process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, false)
+ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, false)
end.
-process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, Hib) ->
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, Hib) ->
case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, Hib, Msg);
+ Time, TimeoutState, Queue1, Debug, Hib, Msg);
{empty, Queue1} ->
+ Time1 = case {Time, TimeoutState} of
+ {binary, {Current, _Min, undefined}} -> Current;
+ _ -> Time
+ end,
receive
Input ->
loop(Parent, Name, State, Mod,
- Time, in(Input, Queue1), Debug)
- after Time ->
+ Time, TimeoutState, in(Input, Queue1), Debug)
+ after Time1 ->
process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, Hib, timeout)
+ Time, TimeoutState, Queue1, Debug, Hib, timeout)
end
end.
-wake_hib(Parent, Name, State, Mod, Queue, Debug) ->
+wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Msg = receive
Input ->
Input
end,
- process_next_msg(Parent, Name, State, Mod, hibernate, in(Msg, Queue), Debug, true).
+ TimeoutState1 = adjust_hibernate_after(TimeoutState),
+ process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1, in(Msg, Queue), Debug, true).
+
+adjust_hibernate_after(undefined) ->
+ undefined;
+adjust_hibernate_after({Current, Min, HibernatedAt}) ->
+ NapLengthMicros = timer:now_diff(now(), HibernatedAt),
+ CurrentMicros = Current * 1000,
+ LowTargetMicros = CurrentMicros * 4,
+ HighTargetMicros = LowTargetMicros * 4,
+ if
+ NapLengthMicros < LowTargetMicros ->
+ %% nap was too short, don't go to sleep as soon
+ {Current * 2, Min, undefined};
+
+ NapLengthMicros > HighTargetMicros ->
+ %% nap was long, try going to sleep sooner
+ {lists:max([Min, round(Current / 2)]), Min, undefined};
+
+ true ->
+ %% nap and timeout seem to be in the right relationship. stay here
+ {Current, Min, undefined}
+ end.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
@@ -423,22 +478,22 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
in(Input, Queue) ->
priority_queue:in(Input, Queue).
-process_msg(Parent, Name, State, Mod, Time, Queue, Debug, _Hib, Msg) ->
+process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, _Hib, Msg) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, Queue]);
+ [Name, State, Mod, Time, TimeoutState, Queue]);
%% gen_server puts Hib on the end as the 7th arg, but that
%% version of the function seems not to be documented so
%% leaving out for now.
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, Queue);
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, Queue, Debug1)
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, Debug1)
end.
%%% ---------------------------------------------------
@@ -641,48 +696,48 @@ dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, Queue) ->
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{reply, Reply, NState, Time1} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, [])),
reply(From, Reply),
exit(R);
Other -> handle_common_reply(Other,
- Parent, Name, Msg, Mod, State, Queue)
+ Parent, Name, Msg, Mod, State, TimeoutState, Queue)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, Queue) ->
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, Queue, Debug) ->
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1);
{reply, Reply, NState, Time1} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
@@ -690,20 +745,20 @@ handle_msg({'$gen_call', From, Msg},
exit(R);
Other ->
handle_common_reply(Other,
- Parent, Name, Msg, Mod, State, Queue, Debug)
+ Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, Queue, Debug) ->
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Reply = (catch dispatch(Msg, Mod, State)),
handle_common_reply(Reply,
- Parent, Name, Msg, Mod, State, Queue, Debug).
+ Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug).
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue) ->
case Reply of
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, []);
{'EXIT', What} ->
@@ -712,16 +767,16 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
end.
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug) ->
case Reply of
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, Debug);
{'EXIT', What} ->
@@ -739,17 +794,17 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
- loop(Parent, Name, State, Mod, Time, Queue, Debug).
+system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
-spec system_terminate(_, _, _, [_]) -> no_return().
-system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _TimeoutState, _Queue]) ->
terminate(Reason, Name, [], Mod, State, Debug).
-system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
+system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
- {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
+ {ok, NewState} -> {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
Else -> Else
end.
@@ -918,7 +973,7 @@ name_to_pid(Name) ->
%% Status information
%%-----------------------------------------------------------------
format_status(Opt, StatusData) ->
- [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] =
+ [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, TimeoutState, Queue]] =
StatusData,
NameTag = if is_pid(Name) ->
pid_to_list(Name);
@@ -937,9 +992,14 @@ format_status(Opt, StatusData) ->
_ ->
[{data, [{"State", State}]}]
end,
+ Specfic1 = case TimeoutState of
+ undefined -> Specfic;
+ {Current, Min, undefined} ->
+ [{"Binary Timeout Current and Min", {Current, Min}} | Specfic]
+ end,
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
{"Queued messages", priority_queue:to_list(Queue)}]} |
- Specfic].
+ Specfic1].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e2dc598f..0180d86a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,9 +54,7 @@
next_msg_id,
message_buffer,
active_consumers,
- blocked_consumers,
- hibernate_after,
- hibernated_at
+ blocked_consumers
}).
-record(consumer, {tag, ack_required}).
@@ -104,10 +102,8 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- hibernate_after = ?HIBERNATE_AFTER_MIN,
- hibernated_at = undefined
- }, ?HIBERNATE_AFTER_MIN}.
+ blocked_consumers = queue:new()
+ }, {binary, ?HIBERNATE_AFTER_MIN}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -122,41 +118,11 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState = #q { hibernated_at = undefined }) ->
- {reply, Reply, NewState, NewState #q.hibernate_after};
reply(Reply, NewState) ->
- NewState1 = adjust_hibernate_after(NewState),
- {reply, Reply, NewState1, NewState1 #q.hibernate_after}.
+ {reply, Reply, NewState, binary}.
-noreply(NewState = #q { hibernated_at = undefined }) ->
- {noreply, NewState, NewState #q.hibernate_after};
noreply(NewState) ->
- NewState1 = adjust_hibernate_after(NewState),
- {noreply, NewState1, NewState1 #q.hibernate_after}.
-
-adjust_hibernate_after(State = #q { hibernated_at = undefined }) ->
- State;
-adjust_hibernate_after(State = #q { hibernated_at = Then,
- hibernate_after = Timeout }) ->
- State1 = State #q { hibernated_at = undefined },
- NapLengthMicros = timer:now_diff(now(), Then),
- TimeoutMicros = Timeout * 1000,
- LowTargetMicros = TimeoutMicros * 4,
- HighTargetMicros = LowTargetMicros * 4,
- if
- NapLengthMicros < LowTargetMicros ->
- %% nap was too short, don't go to sleep as soon
- State1 #q { hibernate_after = Timeout * 2 };
-
- NapLengthMicros > HighTargetMicros ->
- %% nap was long, try going to sleep sooner
- Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]),
- State1 #q { hibernate_after = Timeout1 };
-
- true ->
- %% nap and timeout seem to be in the right relationship. stay here
- State1
- end.
+ {noreply, NewState, binary}.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -852,8 +818,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
handle_info(timeout, State) ->
- State1 = State #q { hibernated_at = now() },
- {noreply, State1, hibernate};
+ {noreply, State, hibernate};
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),