summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-06 13:54:17 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-06 13:54:17 +0100
commit3aa8f53f42e346a986f2505e50bc459daae13703 (patch)
tree581399d2784d8a51a4da17004d8261c7a39828d3
parent24e69d8643cf4dab9a80c62dfa9cbc585e895dfc (diff)
downloadrabbitmq-server-3aa8f53f42e346a986f2505e50bc459daae13703.tar.gz
updating gen_server2 with latest from R13B01 in order to ensure this doesn't slip badly behind the shipped version.
-rw-r--r--src/gen_server2.erl91
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl4
3 files changed, 71 insertions, 28 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index ba8becfc..7ce81f92 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -116,7 +116,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/6]).
-export([behaviour_info/1]).
@@ -135,6 +135,8 @@
%%% API
%%%=========================================================================
+-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
+
behaviour_info(callbacks) ->
[{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
{terminate,2},{code_change,3}];
@@ -329,7 +331,8 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
%%% ---------------------------------------------------
init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
-init_it(Starter, Parent, Name, Mod, Args, Options) ->
+init_it(Starter, Parent, Name0, Mod, Args, Options) ->
+ Name = name(Name0),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
case catch Mod:init(Args) of
@@ -340,12 +343,21 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
{stop, Reason} ->
+ %% For consistency, we must make sure that the
+ %% registered name (if any) is unregistered before
+ %% the parent process is notified about the failure.
+ %% (Otherwise, the parent process could get
+ %% an 'already_started' error if it immediately
+ %% tried starting the process again.)
+ unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
ignore ->
+ unregister_name(Name0),
proc_lib:init_ack(Starter, ignore),
exit(normal);
{'EXIT', Reason} ->
+ unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
Else ->
@@ -354,33 +366,56 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
exit(Error)
end.
+name({local,Name}) -> Name;
+name({global,Name}) -> Name;
+name(Pid) when is_pid(Pid) -> Pid.
+
+unregister_name({local,Name}) ->
+ _ = (catch unregister(Name));
+unregister_name({global,Name}) ->
+ _ = global:unregister_name(Name);
+unregister_name(Pid) when is_pid(Pid) ->
+ Pid.
+
%%%========================================================================
%%% Internal functions
%%%========================================================================
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
+loop(Parent, Name, State, Mod, hibernate, Queue, Debug) ->
+ proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, Queue, Debug]);
loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
Time, in(Input, Queue), Debug)
after 0 ->
- case priority_queue:out(Queue) of
- {{value, Msg}, Queue1} ->
+ process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, false)
+ end.
+
+process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, Hib) ->
+ case priority_queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, Hib, Msg);
+ {empty, Queue1} ->
+ receive
+ Input ->
+ loop(Parent, Name, State, Mod,
+ Time, in(Input, Queue1), Debug)
+ after Time ->
process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, Msg);
- {empty, Queue1} ->
- receive
- Input ->
- loop(Parent, Name, State, Mod,
- Time, in(Input, Queue1), Debug)
- after Time ->
- process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, timeout)
- end
+ Time, Queue1, Debug, Hib, timeout)
end
end.
+wake_hib(Parent, Name, State, Mod, Queue, Debug) ->
+ Msg = receive
+ Input ->
+ Input
+ end,
+ process_next_msg(Parent, Name, State, Mod, hibernate, in(Msg, Queue), Debug, true).
+
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
@@ -388,26 +423,34 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
in(Input, Queue) ->
priority_queue:in(Input, Queue).
-process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
+process_msg(Parent, Name, State, Mod, Time, Queue, Debug, _Hib, Msg) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[Name, State, Mod, Time, Queue]);
+ %% gen_server puts Hib on the end as the 7th arg, but that
+ %% version of the function seems not to be documented so
+ %% leaving out for now.
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, Time, Queue);
+ handle_msg(Msg, Parent, Name, State, Mod, Queue);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1)
+ handle_msg(Msg, Parent, Name, State, Mod, Queue, Debug1)
end.
%%% ---------------------------------------------------
%%% Send/recive functions
%%% ---------------------------------------------------
do_send(Dest, Msg) ->
- catch erlang:send(Dest, Msg).
+ case catch erlang:send(Dest, Msg, [noconnect]) of
+ noconnect ->
+ spawn(erlang, send, [Dest,Msg]);
+ Other ->
+ Other
+ end.
do_multi_call(Nodes, Name, Req, infinity) ->
Tag = make_ref(),
@@ -598,7 +641,7 @@ dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, _Time, Queue) ->
+ Parent, Name, State, Mod, Queue) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
reply(From, Reply),
@@ -619,12 +662,12 @@ handle_msg({'$gen_call', From, Msg},
Parent, Name, Msg, Mod, State, Queue)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, _Time, Queue) ->
+ Parent, Name, State, Mod, Queue) ->
Reply = (catch dispatch(Msg, Mod, State)),
handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Parent, Name, State, Mod, Queue, Debug) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
@@ -650,7 +693,7 @@ handle_msg({'$gen_call', From, Msg},
Parent, Name, Msg, Mod, State, Queue, Debug)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Parent, Name, State, Mod, Queue, Debug) ->
Reply = (catch dispatch(Msg, Mod, State)),
handle_common_reply(Reply,
Parent, Name, Msg, Mod, State, Queue, Debug).
@@ -699,6 +742,8 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
loop(Parent, Name, State, Mod, Time, Queue, Debug).
+-spec system_terminate(_, _, _, [_]) -> no_return().
+
system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
terminate(Reason, Name, [], Mod, State, Debug).
@@ -747,6 +792,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) ->
exit(normal);
shutdown ->
exit(shutdown);
+ {shutdown,_}=Shutdown ->
+ exit(Shutdown);
_ ->
error_info(Reason, Name, Msg, State, Debug),
exit(Reason)
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 64498c37..e2dc598f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -852,10 +852,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
handle_info(timeout, State) ->
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State, hibernate};
State1 = State #q { hibernated_at = now() },
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
+ {noreply, State1, hibernate};
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3089bb62..33c97292 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -162,9 +162,7 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
handle_info(timeout, State) ->
ok = clear_permission_cache(),
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]).
+ {noreply, State, hibernate}.
terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
state = terminating}) ->