diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-07-06 13:54:17 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-07-06 13:54:17 +0100 |
commit | 3aa8f53f42e346a986f2505e50bc459daae13703 (patch) | |
tree | 581399d2784d8a51a4da17004d8261c7a39828d3 | |
parent | 24e69d8643cf4dab9a80c62dfa9cbc585e895dfc (diff) | |
download | rabbitmq-server-3aa8f53f42e346a986f2505e50bc459daae13703.tar.gz |
updating gen_server2 with latest from R13B01 in order to ensure this doesn't slip badly behind the shipped version.
-rw-r--r-- | src/gen_server2.erl | 91 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 4 |
3 files changed, 71 insertions, 28 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ba8becfc..7ce81f92 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -116,7 +116,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/6]). -export([behaviour_info/1]). @@ -135,6 +135,8 @@ %%% API %%%========================================================================= +-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}]. + behaviour_info(callbacks) -> [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, {terminate,2},{code_change,3}]; @@ -329,7 +331,8 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> %%% --------------------------------------------------- init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name, Mod, Args, Options) -> +init_it(Starter, Parent, Name0, Mod, Args, Options) -> + Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), case catch Mod:init(Args) of @@ -340,12 +343,21 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> proc_lib:init_ack(Starter, {ok, self()}), loop(Parent, Name, State, Mod, Timeout, Queue, Debug); {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); ignore -> + unregister_name(Name0), proc_lib:init_ack(Starter, ignore), exit(normal); {'EXIT', Reason} -> + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); Else -> @@ -354,33 +366,56 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> exit(Error) end. +name({local,Name}) -> Name; +name({global,Name}) -> Name; +name(Pid) when is_pid(Pid) -> Pid. + +unregister_name({local,Name}) -> + _ = (catch unregister(Name)); +unregister_name({global,Name}) -> + _ = global:unregister_name(Name); +unregister_name(Pid) when is_pid(Pid) -> + Pid. + %%%======================================================================== %%% Internal functions %%%======================================================================== %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- +loop(Parent, Name, State, Mod, hibernate, Queue, Debug) -> + proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, Queue, Debug]); loop(Parent, Name, State, Mod, Time, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, Time, in(Input, Queue), Debug) after 0 -> - case priority_queue:out(Queue) of - {{value, Msg}, Queue1} -> + process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, false) + end. + +process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, Hib) -> + case priority_queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, Hib, Msg); + {empty, Queue1} -> + receive + Input -> + loop(Parent, Name, State, Mod, + Time, in(Input, Queue1), Debug) + after Time -> process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Msg); - {empty, Queue1} -> - receive - Input -> - loop(Parent, Name, State, Mod, - Time, in(Input, Queue1), Debug) - after Time -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, timeout) - end + Time, Queue1, Debug, Hib, timeout) end end. +wake_hib(Parent, Name, State, Mod, Queue, Debug) -> + Msg = receive + Input -> + Input + end, + process_next_msg(Parent, Name, State, Mod, hibernate, in(Msg, Queue), Debug, true). + in({'$gen_pcast', {Priority, Msg}}, Queue) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> @@ -388,26 +423,34 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> in(Input, Queue) -> priority_queue:in(Input, Queue). -process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> +process_msg(Parent, Name, State, Mod, Time, Queue, Debug, _Hib, Msg) -> case Msg of {system, From, Req} -> sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, [Name, State, Mod, Time, Queue]); + %% gen_server puts Hib on the end as the 7th arg, but that + %% version of the function seems not to be documented so + %% leaving out for now. {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + handle_msg(Msg, Parent, Name, State, Mod, Queue); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + handle_msg(Msg, Parent, Name, State, Mod, Queue, Debug1) end. %%% --------------------------------------------------- %%% Send/recive functions %%% --------------------------------------------------- do_send(Dest, Msg) -> - catch erlang:send(Dest, Msg). + case catch erlang:send(Dest, Msg, [noconnect]) of + noconnect -> + spawn(erlang, send, [Dest,Msg]); + Other -> + Other + end. do_multi_call(Nodes, Name, Req, infinity) -> Tag = make_ref(), @@ -598,7 +641,7 @@ dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, Queue) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), @@ -619,12 +662,12 @@ handle_msg({'$gen_call', From, Msg}, Parent, Name, Msg, Mod, State, Queue) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, Queue) -> Reply = (catch dispatch(Msg, Mod, State)), handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, Queue, Debug) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), @@ -650,7 +693,7 @@ handle_msg({'$gen_call', From, Msg}, Parent, Name, Msg, Mod, State, Queue, Debug) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, Queue, Debug) -> Reply = (catch dispatch(Msg, Mod, State)), handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug). @@ -699,6 +742,8 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> loop(Parent, Name, State, Mod, Time, Queue, Debug). +-spec system_terminate(_, _, _, [_]) -> no_return(). + system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> terminate(Reason, Name, [], Mod, State, Debug). @@ -747,6 +792,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) -> exit(normal); shutdown -> exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); _ -> error_info(Reason, Name, Msg, State, Debug), exit(Reason) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 64498c37..e2dc598f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -852,10 +852,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); handle_info(timeout, State) -> - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; State1 = State #q { hibernated_at = now() }, - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); + {noreply, State1, hibernate}; handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3089bb62..33c97292 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -162,9 +162,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_info(timeout, State) -> ok = clear_permission_cache(), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). + {noreply, State, hibernate}. terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, state = terminating}) -> |