diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-08-08 23:07:20 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-08-08 23:07:20 +0100 |
commit | b6fa7cd1b0e768af6bcf2ed495819d9ef513a62b (patch) | |
tree | 06020c432289cf2ea6d1b88ecc92d1dddaf4c895 | |
parent | 470d185f1ca1fd57ba3152f96fc42cab4e6b8127 (diff) | |
parent | e608954f1ff8604f944edbf254a1f7ae7d9d5c4f (diff) | |
download | rabbitmq-server-b6fa7cd1b0e768af6bcf2ed495819d9ef513a62b.tar.gz |
merge bug21087 into default
-rw-r--r-- | src/gen_server2.erl | 364 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 4 |
3 files changed, 300 insertions, 83 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ba8becfc..36fb4fa8 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1,4 +1,4 @@ -%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP %% distribution, with the following modifications: %% %% 1) the module name is gen_server2 @@ -21,6 +21,42 @@ %% higher priorities are processed before requests with lower %% priorities. The default priority is 0. %% +%% 5) The callback module can optionally implement +%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be +%% called immediately prior to and post hibernation, respectively. If +%% handle_pre_hibernate returns {hibernate, NewState} then the process +%% will hibernate. If the module does not implement +%% handle_pre_hibernate/1 then the default action is to hibernate. +%% +%% 6) init can return a 4th arg, {backoff, InitialTimeout, +%% MinimumTimeout, DesiredHibernatePeriod} (all in +%% milliseconds). Then, on all callbacks which can return a timeout +%% (including init), timeout can be 'hibernate'. When this is the +%% case, the current timeout value will be used (initially, the +%% InitialTimeout supplied from init). After this timeout has +%% occurred, hibernation will occur as normal. Upon awaking, a new +%% current timeout value will be calculated. +%% +%% The purpose is that the gen_server2 takes care of adjusting the +%% current timeout value such that the process will increase the +%% timeout value repeatedly if it is unable to sleep for the +%% DesiredHibernatePeriod. If it is able to sleep for the +%% DesiredHibernatePeriod it will decrease the current timeout down to +%% the MinimumTimeout, so that the process is put to sleep sooner (and +%% hopefully stays asleep for longer). In short, should a process +%% using this receive a burst of messages, it should not hibernate +%% between those messages, but as the messages become less frequent, +%% the process will not only hibernate, it will do so sooner after +%% each message. +%% +%% When using this backoff mechanism, normal timeout values (i.e. not +%% 'hibernate') can still be used, and if they are used then the +%% handle_info(timeout, State) will be called as normal. In this case, +%% returning 'hibernate' from handle_info(timeout, State) will not +%% hibernate the process immediately, as it would if backoff wasn't +%% being used. Instead it'll wait for the current timeout as described +%% above. + %% All modifications are (C) 2009 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, @@ -55,6 +91,7 @@ %%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} +%%% {ok, State, Timeout, Backoff} %%% ignore %%% {stop, Reason} %%% @@ -86,6 +123,17 @@ %%% %%% ==> ok %%% +%%% handle_pre_hibernate(State) +%%% +%%% ==> {hibernate, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% handle_post_hibernate(State) +%%% +%%% ==> {noreply, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% The work flow (of the server) can be described as follows: %%% @@ -116,7 +164,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). -export([behaviour_info/1]). @@ -290,7 +338,7 @@ multi_call(Nodes, Name, Req, Timeout) %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ %% %% Description: Makes an existing process into a gen_server. %% The calling process will enter the gen_server receive @@ -301,20 +349,30 @@ multi_call(Nodes, Name, Req, Timeout) %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity). + enter_loop(Mod, Options, State, self(), infinity, undefined). + +enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> + enter_loop(Mod, Options, State, self(), infinity, Backoff); enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity); + enter_loop(Mod, Options, State, ServerName, infinity, undefined); enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout). + enter_loop(Mod, Options, State, self(), Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity, Backoff); enter_loop(Mod, Options, State, ServerName, Timeout) -> + enter_loop(Mod, Options, State, ServerName, Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + Backoff1 = extend_backoff(Backoff), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -329,23 +387,37 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> %%% --------------------------------------------------- init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name, Mod, Args, Options) -> +init_it(Starter, Parent, Name0, Mod, Args, Options) -> + Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, Queue, Debug); + loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + Backoff1 = extend_backoff(Backoff), + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); ignore -> + unregister_name(Name0), proc_lib:init_ack(Starter, ignore), exit(normal); {'EXIT', Reason} -> + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); Else -> @@ -354,33 +426,159 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> exit(Error) end. +name({local,Name}) -> Name; +name({global,Name}) -> Name; +%% name(Pid) when is_pid(Pid) -> Pid; +%% when R11 goes away, drop the line beneath and uncomment the line above +name(Name) -> Name. + +unregister_name({local,Name}) -> + _ = (catch unregister(Name)); +unregister_name({global,Name}) -> + _ = global:unregister_name(Name); +unregister_name(Pid) when is_pid(Pid) -> + Pid. + +extend_backoff(undefined) -> + undefined; +extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> + {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}. + %%%======================================================================== %%% Internal functions %%%======================================================================== %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, Time, Queue, Debug) -> +loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> + pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); +loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, + drain(Queue), Debug). + +drain(Queue) -> receive - Input -> loop(Parent, Name, State, Mod, - Time, in(Input, Queue), Debug) - after 0 -> - case priority_queue:out(Queue) of - {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Msg); - {empty, Queue1} -> - receive - Input -> - loop(Parent, Name, State, Mod, - Time, in(Input, Queue1), Debug) - after Time -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, timeout) + Input -> drain(in(Input, Queue)) + after 0 -> Queue + end. + +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + case priority_queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, TimeoutState, Queue1, Debug, Msg); + {empty, Queue1} -> + {Time1, HibOnTimeout} + = case {Time, TimeoutState} of + {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> + {Current, true}; + {hibernate, _} -> + %% wake_hib/7 will set Time to hibernate. If + %% we were woken and didn't receive a msg + %% then we will get here and need a sensible + %% value for Time1, otherwise we crash. + %% R13B1 always waits infinitely when waking + %% from hibernation, so that's what we do + %% here too. + {infinity, false}; + _ -> {Time, false} + end, + receive + Input -> + %% Time could be 'hibernate' here, so *don't* call loop + process_next_msg( + Parent, Name, State, Mod, Time, TimeoutState, + drain(in(Input, Queue1)), Debug) + after Time1 -> + case HibOnTimeout of + true -> + pre_hibernate( + Parent, Name, State, Mod, TimeoutState, Queue1, + Debug); + false -> + process_msg( + Parent, Name, State, Mod, Time, TimeoutState, + Queue1, Debug, timeout) end end end. +wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> + TimeoutState1 = case TS of + undefined -> + undefined; + {SleptAt, TimeoutState} -> + adjust_timeout_state(SleptAt, now(), TimeoutState) + end, + post_hibernate(Parent, Name, State, Mod, TimeoutState1, + drain(Queue), Debug). + +hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + TS = case TimeoutState of + undefined -> undefined; + {backoff, _, _, _, _} -> {now(), TimeoutState} + end, + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, + TS, Queue, Debug]). + +pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_pre_hibernate, 1) of + true -> + case catch Mod:handle_pre_hibernate(State) of + {hibernate, NState} -> + hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, + Debug); + Reply -> + handle_common_termination(Reply, Name, pre_hibernate, + Mod, State, Debug) + end; + false -> + hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + end. + +post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_post_hibernate, 1) of + true -> + case catch Mod:handle_post_hibernate(State) of + {noreply, NState} -> + process_next_msg(Parent, Name, NState, Mod, infinity, + TimeoutState, Queue, Debug); + {noreply, NState, Time} -> + process_next_msg(Parent, Name, NState, Mod, Time, + TimeoutState, Queue, Debug); + Reply -> + handle_common_termination(Reply, Name, post_hibernate, + Mod, State, Debug) + end; + false -> + %% use hibernate here, not infinity. This matches + %% R13B. The key is that we should be able to get through + %% to process_msg calling sys:handle_system_msg with Time + %% still set to hibernate, iff that msg is the very msg + %% that woke us up (or the first msg we receive after + %% waking up). + process_next_msg(Parent, Name, State, Mod, hibernate, + TimeoutState, Queue, Debug) + end. + +adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, + DesiredHibPeriod, RandomState}) -> + NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), + CurrentMicros = CurrentTO * 1000, + MinimumMicros = MinimumTO * 1000, + DesiredHibMicros = DesiredHibPeriod * 1000, + GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, + Base = + %% If enough time has passed between the last two messages then we + %% should consider sleeping sooner. Otherwise stay awake longer. + case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of + true -> lists:max([MinimumTO, CurrentTO div 2]); + false -> CurrentTO + end, + {Extra, RandomState1} = random:uniform_s(Base, RandomState), + CurrentTO1 = Base + Extra, + {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. + in({'$gen_pcast', {Priority, Msg}}, Queue) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> @@ -388,19 +586,25 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> in(Input, Queue) -> priority_queue:in(Input, Queue). -process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> +process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, + Debug, Msg) -> case Msg of {system, From, Req} -> - sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, Queue]); + sys:handle_system_msg + (Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, TimeoutState, Queue]); + %% gen_server puts Hib on the end as the 7th arg, but that + %% version of the function seems not to be documented so + %% leaving out for now. {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, + Debug1) end. %%% --------------------------------------------------- @@ -598,87 +802,95 @@ dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {reply, Reply, NState, Time1} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), reply(From, Reply), exit(R); - Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue) + Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State, + TimeoutState, Queue) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {reply, Reply, NState, Time1} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), reply(Name, From, Reply, NState, Debug), exit(R); Other -> - handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue, Debug) + handle_common_reply(Other, Parent, Name, Msg, Mod, State, + TimeoutState, Queue, Debug) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, - Parent, Name, Msg, Mod, State, Queue, Debug). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue, Debug). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, []); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, []) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, + Debug) -> case Reply of {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, Debug) + end. + +handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> + case Reply of {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -696,16 +908,24 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> - loop(Parent, Name, State, Mod, Time, Queue, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> + loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> +-ifdef(use_specs). +-spec system_terminate(_, _, _, [_]) -> no_return(). +-endif. + +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, + _TimeoutState, _Queue]) -> terminate(Reason, Name, [], Mod, State, Debug). -system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> +system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, + OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; - Else -> Else + {ok, NewState} -> + {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; + Else -> + Else end. %%----------------------------------------------------------------- @@ -747,6 +967,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) -> exit(normal); shutdown -> exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); _ -> error_info(Reason, Name, Msg, State, Debug), exit(Reason) @@ -871,8 +1093,8 @@ name_to_pid(Name) -> %% Status information %%----------------------------------------------------------------- format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = - StatusData, + [PDict, SysState, Parent, Debug, + [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> pid_to_list(Name); is_atom(Name) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cf0ef44f..fe2e8509 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,7 +36,8 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -export([start_link/1]). @@ -101,7 +102,8 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. + blocked_consumers = queue:new()}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -116,9 +118,9 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. +noreply(NewState) -> {noreply, NewState, hibernate}. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -813,11 +815,6 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); -handle_info(timeout, State) -> - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); - handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 87664de3..58b94234 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -162,9 +162,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_info(timeout, State) -> ok = clear_permission_cache(), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). + {noreply, State, hibernate}. terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, state = terminating}) -> |