summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-15 17:21:20 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-15 17:21:20 +0100
commit96ff740ff960d5a7aefb816b909491af2b1937a6 (patch)
tree092970154aee0a16b418027822dd8e023b9a6d00
parent5e375269b81f02d2fb74a334fb36d3b18eb407db (diff)
parenta7e36d635b4dab45320cd4744d8e0f3be5c946df (diff)
downloadrabbitmq-server-96ff740ff960d5a7aefb816b909491af2b1937a6.tar.gz
Merging bug 23157 into default
-rw-r--r--src/gen_server2.erl917
-rw-r--r--src/rabbit_amqqueue.erl36
-rw-r--r--src/rabbit_amqqueue_process.erl26
-rw-r--r--src/rabbit_channel.erl22
-rw-r--r--src/rabbit_limiter.erl7
-rw-r--r--src/rabbit_msg_store.erl32
-rw-r--r--src/rabbit_msg_store_gc.erl7
-rw-r--r--src/worker_pool_worker.erl7
8 files changed, 562 insertions, 492 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 9fb9e2fe..e2bb940f 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -16,10 +16,12 @@
%% The original code could reorder messages when communicating with a
%% process on a remote node that was not currently connected.
%%
-%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
-%% allow callers to attach priorities to requests. Requests with
-%% higher priorities are processed before requests with lower
-%% priorities. The default priority is 0.
+%% 4) The callback module can optionally implement prioritise_call/3,
+%% prioritise_cast/2 and prioritise_info/2. These functions take
+%% Message, From and State or just Message and State and return a
+%% single integer representing the priority attached to the message.
+%% Messages with higher priorities are processed before requests with
+%% lower priorities. The default priority is 0.
%%
%% 5) The callback module can optionally implement
%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
@@ -82,13 +84,13 @@
%%%
%%% The idea behind THIS server is that the user module
%%% provides (different) functions to handle different
-%%% kind of inputs.
+%%% kind of inputs.
%%% If the Parent process terminates the Module:terminate/2
%%% function is called.
%%%
%%% The user module should export:
%%%
-%%% init(Args)
+%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
%%% {ok, State, Timeout, Backoff}
@@ -101,21 +103,21 @@
%%% {reply, Reply, State, Timeout}
%%% {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, Reply, State}
+%%% {stop, Reason, Reply, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_cast(Msg, State)
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, State}
+%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, State}
+%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% terminate(Reason, State) Let the user module clean up
@@ -159,37 +161,41 @@
%% API
-export([start/3, start/4,
- start_link/3, start_link/4,
- call/2, call/3, pcall/3, pcall/4,
- cast/2, pcast/3, reply/2,
- abcast/2, abcast/3,
- multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
+ start_link/3, start_link/4,
+ call/2, call/3,
+ cast/2, reply/2,
+ abcast/2, abcast/3,
+ multi_call/2, multi_call/3, multi_call/4,
+ enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
-export([behaviour_info/1]).
%% System exports
-export([system_continue/3,
- system_terminate/4,
- system_code_change/4,
- format_status/2]).
+ system_terminate/4,
+ system_code_change/4,
+ format_status/2]).
%% Internal exports
-export([init_it/6, print_event/3]).
-import(error_logger, [format/2]).
+%% State record
+-record(gs2_state, {parent, name, state, mod, time,
+ timeout_state, queue, debug, prioritise_call,
+ prioritise_cast, prioritise_info}).
+
%%%=========================================================================
%%% Specs. These exist only to shut up dialyzer's warnings
%%%=========================================================================
-ifdef(use_specs).
--spec(handle_common_termination/6 ::
- (any(), any(), any(), atom(), any(), any()) -> no_return()).
+-spec(handle_common_termination/3 ::
+ (any(), atom(), #gs2_state{}) -> no_return()).
--spec(hibernate/7 ::
- (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()).
+-spec(hibernate/1 :: (#gs2_state{}) -> no_return()).
-endif.
@@ -238,37 +244,21 @@ start_link(Name, Mod, Args, Options) ->
%% be monitored.
%% If the client is trapping exits and is linked server termination
%% is handled here (? Shall we do that here (or rely on timeouts) ?).
-%% -----------------------------------------------------------------
+%% -----------------------------------------------------------------
call(Name, Request) ->
case catch gen:call(Name, '$gen_call', Request) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, call, [Name, Request]}})
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request]}})
end.
call(Name, Request, Timeout) ->
case catch gen:call(Name, '$gen_call', Request, Timeout) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
- end.
-
-pcall(Name, Priority, Request) ->
- case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
- end.
-
-pcall(Name, Priority, Request, Timeout) ->
- case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
end.
%% -----------------------------------------------------------------
@@ -277,34 +267,18 @@ pcall(Name, Priority, Request, Timeout) ->
cast({global,Name}, Request) ->
catch global:send(Name, cast_msg(Request)),
ok;
-cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_atom(Dest) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_pid(Dest) ->
do_cast(Dest, Request).
-do_cast(Dest, Request) ->
+do_cast(Dest, Request) ->
do_send(Dest, cast_msg(Request)),
ok.
-
-cast_msg(Request) -> {'$gen_cast',Request}.
-pcast({global,Name}, Priority, Request) ->
- catch global:send(Name, cast_msg(Priority, Request)),
- ok;
-pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
- do_cast(Dest, Priority, Request);
-pcast(Dest, Priority, Request) when is_atom(Dest) ->
- do_cast(Dest, Priority, Request);
-pcast(Dest, Priority, Request) when is_pid(Dest) ->
- do_cast(Dest, Priority, Request).
-
-do_cast(Dest, Priority, Request) ->
- do_send(Dest, cast_msg(Priority, Request)),
- ok.
-
-cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
+cast_msg(Request) -> {'$gen_cast',Request}.
%% -----------------------------------------------------------------
%% Send a reply to the client.
@@ -312,9 +286,9 @@ cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
reply({To, Tag}, Reply) ->
catch To ! {Tag, Reply}.
-%% -----------------------------------------------------------------
-%% Asyncronous broadcast, returns nothing, it's just send'n prey
-%%-----------------------------------------------------------------
+%% -----------------------------------------------------------------
+%% Asyncronous broadcast, returns nothing, it's just send'n pray
+%% -----------------------------------------------------------------
abcast(Name, Request) when is_atom(Name) ->
do_abcast([node() | nodes()], Name, cast_msg(Request)).
@@ -330,36 +304,36 @@ do_abcast([], _,_) -> abcast.
%%% Make a call to servers at several nodes.
%%% Returns: {[Replies],[BadNodes]}
%%% A Timeout can be given
-%%%
+%%%
%%% A middleman process is used in case late answers arrives after
%%% the timeout. If they would be allowed to glog the callers message
-%%% queue, it would probably become confused. Late answers will
+%%% queue, it would probably become confused. Late answers will
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
multi_call(Name, Req)
when is_atom(Name) ->
do_multi_call([node() | nodes()], Name, Req, infinity).
-multi_call(Nodes, Name, Req)
+multi_call(Nodes, Name, Req)
when is_list(Nodes), is_atom(Name) ->
do_multi_call(Nodes, Name, Req, infinity).
multi_call(Nodes, Name, Req, infinity) ->
do_multi_call(Nodes, Name, Req, infinity);
-multi_call(Nodes, Name, Req, Timeout)
+multi_call(Nodes, Name, Req, Timeout)
when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Req, Timeout).
%%-----------------------------------------------------------------
-%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
-%%
-%% Description: Makes an existing process into a gen_server.
-%% The calling process will enter the gen_server receive
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
+%%
+%% Description: Makes an existing process into a gen_server.
+%% The calling process will enter the gen_server receive
%% loop and become a gen_server process.
-%% The process *must* have been started using one of the
-%% start functions in proc_lib, see proc_lib(3).
-%% The user is responsible for any initialization of the
+%% The process *must* have been started using one of the
+%% start functions in proc_lib, see proc_lib(3).
+%% The user is responsible for any initialization of the
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
@@ -386,7 +360,10 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
Backoff1 = extend_backoff(Backoff),
- loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug).
+ loop(find_prioritisers(
+ #gs2_state { parent = Parent, name = Name, state = State,
+ mod = Mod, time = Timeout, timeout_state = Backoff1,
+ queue = Queue, debug = Debug })).
%%%========================================================================
%%% Gen-callback functions
@@ -405,39 +382,51 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = name(Name0),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
+ GS2State = find_prioritisers(
+ #gs2_state { parent = Parent,
+ name = Name,
+ mod = Mod,
+ queue = Queue,
+ debug = Debug }),
case catch Mod:init(Args) of
- {ok, State} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
- {ok, State, Timeout} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
- {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ {ok, State} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = infinity,
+ timeout_state = undefined });
+ {ok, State, Timeout} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = Timeout,
+ timeout_state = undefined });
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
Backoff1 = extend_backoff(Backoff),
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
- {stop, Reason} ->
- %% For consistency, we must make sure that the
- %% registered name (if any) is unregistered before
- %% the parent process is notified about the failure.
- %% (Otherwise, the parent process could get
- %% an 'already_started' error if it immediately
- %% tried starting the process again.)
- unregister_name(Name0),
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- ignore ->
- unregister_name(Name0),
- proc_lib:init_ack(Starter, ignore),
- exit(normal);
- {'EXIT', Reason} ->
- unregister_name(Name0),
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- Else ->
- Error = {bad_return_value, Else},
- proc_lib:init_ack(Starter, {error, Error}),
- exit(Error)
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = Timeout,
+ timeout_state = Backoff1 });
+ {stop, Reason} ->
+ %% For consistency, we must make sure that the
+ %% registered name (if any) is unregistered before
+ %% the parent process is notified about the failure.
+ %% (Otherwise, the parent process could get
+ %% an 'already_started' error if it immediately
+ %% tried starting the process again.)
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ ignore ->
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, ignore),
+ exit(normal);
+ {'EXIT', Reason} ->
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ Else ->
+ Error = {bad_return_value, Else},
+ proc_lib:init_ack(Starter, {error, Error}),
+ exit(Error)
end.
name({local,Name}) -> Name;
@@ -467,23 +456,24 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
- pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug);
-loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
- process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
- drain(Queue), Debug).
+loop(GS2State = #gs2_state { time = hibernate,
+ timeout_state = undefined }) ->
+ pre_hibernate(GS2State);
+loop(GS2State) ->
+ process_next_msg(drain(GS2State)).
-drain(Queue) ->
+drain(GS2State) ->
receive
- Input -> drain(in(Input, Queue))
- after 0 -> Queue
+ Input -> drain(in(Input, GS2State))
+ after 0 -> GS2State
end.
-process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+process_next_msg(GS2State = #gs2_state { time = Time,
+ timeout_state = TimeoutState,
+ queue = Queue }) ->
case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
- process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Msg);
+ process_msg(Msg, GS2State #gs2_state { queue = Queue1 });
{empty, Queue1} ->
{Time1, HibOnTimeout}
= case {Time, TimeoutState} of
@@ -504,68 +494,64 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
Input ->
%% Time could be 'hibernate' here, so *don't* call loop
process_next_msg(
- Parent, Name, State, Mod, Time, TimeoutState,
- drain(in(Input, Queue1)), Debug)
+ drain(in(Input, GS2State #gs2_state { queue = Queue1 })))
after Time1 ->
case HibOnTimeout of
true ->
pre_hibernate(
- Parent, Name, State, Mod, TimeoutState, Queue1,
- Debug);
+ GS2State #gs2_state { queue = Queue1 });
false ->
- process_msg(
- Parent, Name, State, Mod, Time, TimeoutState,
- Queue1, Debug, timeout)
+ process_msg(timeout,
+ GS2State #gs2_state { queue = Queue1 })
end
end
end.
-wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) ->
+wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
TimeoutState1 = case TS of
undefined ->
undefined;
{SleptAt, TimeoutState} ->
adjust_timeout_state(SleptAt, now(), TimeoutState)
end,
- post_hibernate(Parent, Name, State, Mod, TimeoutState1,
- drain(Queue), Debug).
+ post_hibernate(
+ drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
-hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
TS = case TimeoutState of
undefined -> undefined;
{backoff, _, _, _, _} -> {now(), TimeoutState}
end,
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
- TS, Queue, Debug]).
+ proc_lib:hibernate(?MODULE, wake_hib,
+ [GS2State #gs2_state { timeout_state = TS }]).
-pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+pre_hibernate(GS2State = #gs2_state { state = State,
+ mod = Mod }) ->
case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
true ->
case catch Mod:handle_pre_hibernate(State) of
{hibernate, NState} ->
- hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
- Debug);
+ hibernate(GS2State #gs2_state { state = NState } );
Reply ->
- handle_common_termination(Reply, Name, pre_hibernate,
- Mod, State, Debug)
+ handle_common_termination(Reply, pre_hibernate, GS2State)
end;
false ->
- hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug)
+ hibernate(GS2State)
end.
-post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+post_hibernate(GS2State = #gs2_state { state = State,
+ mod = Mod }) ->
case erlang:function_exported(Mod, handle_post_hibernate, 1) of
true ->
case catch Mod:handle_post_hibernate(State) of
{noreply, NState} ->
- process_next_msg(Parent, Name, NState, Mod, infinity,
- TimeoutState, Queue, Debug);
+ process_next_msg(GS2State #gs2_state { state = NState,
+ time = infinity });
{noreply, NState, Time} ->
- process_next_msg(Parent, Name, NState, Mod, Time,
- TimeoutState, Queue, Debug);
+ process_next_msg(GS2State #gs2_state { state = NState,
+ time = Time });
Reply ->
- handle_common_termination(Reply, Name, post_hibernate,
- Mod, State, Debug)
+ handle_common_termination(Reply, post_hibernate, GS2State)
end;
false ->
%% use hibernate here, not infinity. This matches
@@ -574,8 +560,7 @@ post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
%% still set to hibernate, iff that msg is the very msg
%% that woke us up (or the first msg we receive after
%% waking up).
- process_next_msg(Parent, Name, State, Mod, hibernate,
- TimeoutState, Queue, Debug)
+ process_next_msg(GS2State #gs2_state { time = hibernate })
end.
adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
@@ -596,32 +581,40 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
CurrentTO1 = Base + Extra,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
-in({'$gen_pcast', {Priority, Msg}}, Queue) ->
- priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
-in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
- priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
-in(Input, Queue) ->
- priority_queue:in(Input, Queue).
-
-process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, Msg) ->
+in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC,
+ queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ {'$gen_cast', Msg},
+ PC(Msg, GS2State), Queue) };
+in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC,
+ queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ {'$gen_call', From, Msg},
+ PC(Msg, From, GS2State), Queue) };
+in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ Input, PI(Input, GS2State), Queue) }.
+
+process_msg(Msg,
+ GS2State = #gs2_state { parent = Parent,
+ name = Name,
+ debug = Debug }) ->
case Msg of
- {system, From, Req} ->
- sys:handle_system_msg(
+ {system, From, Req} ->
+ sys:handle_system_msg(
Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, TimeoutState, Queue]);
+ GS2State);
%% gen_server puts Hib on the end as the 7th arg, but that
%% version of the function seems not to be documented so
%% leaving out for now.
- {'EXIT', Parent, Reason} ->
- terminate(Reason, Name, Msg, Mod, State, Debug);
- _Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
- _Msg ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
- Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue,
- Debug1)
+ {'EXIT', Parent, Reason} ->
+ terminate(Reason, Msg, GS2State);
+ _Msg when Debug =:= [] ->
+ handle_msg(Msg, GS2State);
+ _Msg ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+ Name, {in, Msg}),
+ handle_msg(Msg, GS2State #gs2_state { debug = Debug1 })
end.
%%% ---------------------------------------------------
@@ -638,35 +631,35 @@ do_multi_call(Nodes, Name, Req, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver =
- spawn(
- fun () ->
- %% Middleman process. Should be unsensitive to regular
- %% exit signals. The sychronization is needed in case
- %% the receiver would exit before the caller started
- %% the monitor.
- process_flag(trap_exit, true),
- Mref = erlang:monitor(process, Caller),
- receive
- {Caller,Tag} ->
- Monitors = send_nodes(Nodes, Name, Tag, Req),
- TimerId = erlang:start_timer(Timeout, self(), ok),
- Result = rec_nodes(Tag, Monitors, Name, TimerId),
- exit({self(),Tag,Result});
- {'DOWN',Mref,_,_,_} ->
- %% Caller died before sending us the go-ahead.
- %% Give up silently.
- exit(normal)
- end
- end),
+ spawn(
+ fun () ->
+ %% Middleman process. Should be unsensitive to regular
+ %% exit signals. The sychronization is needed in case
+ %% the receiver would exit before the caller started
+ %% the monitor.
+ process_flag(trap_exit, true),
+ Mref = erlang:monitor(process, Caller),
+ receive
+ {Caller,Tag} ->
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ TimerId = erlang:start_timer(Timeout, self(), ok),
+ Result = rec_nodes(Tag, Monitors, Name, TimerId),
+ exit({self(),Tag,Result});
+ {'DOWN',Mref,_,_,_} ->
+ %% Caller died before sending us the go-ahead.
+ %% Give up silently.
+ exit(normal)
+ end
+ end),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(),Tag},
receive
- {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
- Result;
- {'DOWN',Mref,_,_,Reason} ->
- %% The middleman code failed. Or someone did
- %% exit(_, kill) on the middleman process => Reason==killed
- exit(Reason)
+ {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+ Result;
+ {'DOWN',Mref,_,_,Reason} ->
+ %% The middleman code failed. Or someone did
+ %% exit(_, kill) on the middleman process => Reason==killed
+ exit(Reason)
end.
send_nodes(Nodes, Name, Tag, Req) ->
@@ -681,7 +674,7 @@ send_nodes([Node|Tail], Name, Tag, Req, Monitors)
send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
%% Skip non-atom Node
send_nodes(Tail, Name, Tag, Req, Monitors);
-send_nodes([], _Name, _Tag, _Req, Monitors) ->
+send_nodes([], _Name, _Tag, _Req, Monitors) ->
Monitors.
%% Against old nodes:
@@ -691,89 +684,89 @@ send_nodes([], _Name, _Tag, _Req, Monitors) ->
%% Against contemporary nodes:
%% Wait for reply, server 'DOWN', or timeout from TimerId.
-rec_nodes(Tag, Nodes, Name, TimerId) ->
+rec_nodes(Tag, Nodes, Name, TimerId) ->
rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
receive
- {'DOWN', R, _, _, _} ->
- rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- unmonitor(R),
- rec_nodes(Tag, Tail, Name, Badnodes,
- [{N,Reply}|Replies], Time, TimerId);
- {timeout, TimerId, _} ->
- unmonitor(R),
- %% Collect all replies that already have arrived
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ {'DOWN', R, _, _, _} ->
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], Time, TimerId);
+ {timeout, TimerId, _} ->
+ unmonitor(R),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
%% R6 node
receive
- {nodedown, N} ->
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, Badnodes,
- [{N,Reply}|Replies], 2000, TimerId);
- {timeout, TimerId, _} ->
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- %% Collect all replies that already have arrived
- rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], 2000, TimerId);
+ {timeout, TimerId, _} ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
after Time ->
- case rpc:call(N, erlang, whereis, [Name]) of
- Pid when is_pid(Pid) -> % It exists try again.
- rec_nodes(Tag, [N|Tail], Name, Badnodes,
- Replies, infinity, TimerId);
- _ -> % badnode
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, [N|Badnodes],
- Replies, 2000, TimerId)
- end
+ case rpc:call(N, erlang, whereis, [Name]) of
+ Pid when is_pid(Pid) -> % It exists try again.
+ rec_nodes(Tag, [N|Tail], Name, Badnodes,
+ Replies, infinity, TimerId);
+ _ -> % badnode
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes],
+ Replies, 2000, TimerId)
+ end
end;
rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
case catch erlang:cancel_timer(TimerId) of
- false -> % It has already sent it's message
- receive
- {timeout, TimerId, _} -> ok
- after 0 ->
- ok
- end;
- _ -> % Timer was cancelled, or TimerId was 'undefined'
- ok
+ false -> % It has already sent it's message
+ receive
+ {timeout, TimerId, _} -> ok
+ after 0 ->
+ ok
+ end;
+ _ -> % Timer was cancelled, or TimerId was 'undefined'
+ ok
end,
{Replies, Badnodes}.
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
receive
- {'DOWN', R, _, _, _} ->
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- unmonitor(R),
- rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ {'DOWN', R, _, _, _} ->
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
after 0 ->
- unmonitor(R),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
%% R6 node
receive
- {nodedown, N} ->
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
after 0 ->
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
{Replies, Badnodes}.
@@ -785,28 +778,28 @@ rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
if node() =:= nonode@nohost, Node =/= nonode@nohost ->
- Ref = make_ref(),
- self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
- {Node, Ref};
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+ {Node, Ref};
true ->
- case catch erlang:monitor(process, {Name, Node}) of
- {'EXIT', _} ->
- %% Remote node is R6
- monitor_node(Node, true),
- Node;
- Ref when is_reference(Ref) ->
- {Node, Ref}
- end
+ case catch erlang:monitor(process, {Name, Node}) of
+ {'EXIT', _} ->
+ %% Remote node is R6
+ monitor_node(Node, true),
+ Node;
+ Ref when is_reference(Ref) ->
+ {Node, Ref}
+ end
end.
%% Cancels a monitor started with Ref=erlang:monitor(_, _).
unmonitor(Ref) when is_reference(Ref) ->
erlang:demonitor(Ref),
receive
- {'DOWN', Ref, _, _, _} ->
- true
+ {'DOWN', Ref, _, _, _} ->
+ true
after 0 ->
- true
+ true
end.
%%% ---------------------------------------------------
@@ -818,130 +811,114 @@ dispatch({'$gen_cast', Msg}, Mod, State) ->
dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
-handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, TimeoutState, Queue) ->
- case catch Mod:handle_call(Msg, From, State) of
- {reply, Reply, NState} ->
- reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {reply, Reply, NState, Time1} ->
- reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
- {noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
- {stop, Reason, Reply, NState} ->
- {'EXIT', R} =
- (catch terminate(Reason, Name, Msg, Mod, NState, [])),
- reply(From, Reply),
- exit(R);
- Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue)
- end;
-handle_msg(Msg,
- Parent, Name, State, Mod, TimeoutState, Queue) ->
- Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue).
-
-handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
+ reply(From, Reply),
+ [];
+common_reply(Name, From, Reply, NState, Debug) ->
+ reply(Name, From, Reply, NState, Debug).
+
+common_debug([] = _Debug, _Func, _Info, _Event) ->
+ [];
+common_debug(Debug, Func, Info, Event) ->
+ sys:handle_debug(Debug, Func, Info, Event).
+
+handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
+ state = State,
+ name = Name,
+ debug = Debug }) ->
case catch Mod:handle_call(Msg, From, State) of
- {reply, Reply, NState} ->
- Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {reply, Reply, NState, Time1} ->
- Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
- {noreply, NState} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {noreply, NState, Time1} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
- {stop, Reason, Reply, NState} ->
- {'EXIT', R} =
- (catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
- reply(Name, From, Reply, NState, Debug),
- exit(R);
- Other ->
- handle_common_reply(Other, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue, Debug)
+ {reply, Reply, NState} ->
+ Debug1 = common_reply(Name, From, Reply, NState, Debug),
+ loop(GS2State #gs2_state { state = NState,
+ time = infinity,
+ debug = Debug1 });
+ {reply, Reply, NState, Time1} ->
+ Debug1 = common_reply(Name, From, Reply, NState, Debug),
+ loop(GS2State #gs2_state { state = NState,
+ time = Time1,
+ debug = Debug1});
+ {noreply, NState} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state {state = NState,
+ time = infinity,
+ debug = Debug1});
+ {noreply, NState, Time1} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state {state = NState,
+ time = Time1,
+ debug = Debug1});
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Msg,
+ GS2State #gs2_state { state = NState })),
+ reply(Name, From, Reply, NState, Debug),
+ exit(R);
+ Other ->
+ handle_common_reply(Other, Msg, GS2State)
end;
-handle_msg(Msg,
- Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue, Debug).
+ handle_common_reply(Reply, Msg, GS2State).
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue) ->
+handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
+ debug = Debug}) ->
case Reply of
- {noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ {noreply, NState} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state { state = NState,
+ time = infinity,
+ debug = Debug1 });
+ {noreply, NState, Time1} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state { state = NState,
+ time = Time1,
+ debug = Debug1 });
_ ->
- handle_common_termination(Reply, Name, Msg, Mod, State, [])
+ handle_common_termination(Reply, Msg, GS2State)
end.
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
- Debug) ->
+handle_common_termination(Reply, Msg, GS2State) ->
case Reply of
- {noreply, NState} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {noreply, NState, Time1} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ {stop, Reason, NState} ->
+ terminate(Reason, Msg, GS2State #gs2_state { state = NState });
+ {'EXIT', What} ->
+ terminate(What, Msg, GS2State);
_ ->
- handle_common_termination(Reply, Name, Msg, Mod, State, Debug)
- end.
-
-handle_common_termination(Reply, Name, Msg, Mod, State, Debug) ->
- case Reply of
- {stop, Reason, NState} ->
- terminate(Reason, Name, Msg, Mod, NState, Debug);
- {'EXIT', What} ->
- terminate(What, Name, Msg, Mod, State, Debug);
- _ ->
- terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+ terminate({bad_return_value, Reply}, Msg, GS2State)
end.
reply(Name, {To, Tag}, Reply, State, Debug) ->
reply({To, Tag}, Reply),
- sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {out, Reply, To, State} ).
+ sys:handle_debug(
+ Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}).
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
- loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
+system_continue(Parent, Debug, GS2State) ->
+ loop(GS2State #gs2_state { parent = Parent, debug = Debug }).
-ifdef(use_specs).
-spec system_terminate(_, _, _, [_]) -> no_return().
-endif.
-system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time,
- _TimeoutState, _Queue]) ->
- terminate(Reason, Name, [], Mod, State, Debug).
+system_terminate(Reason, _Parent, Debug, GS2State) ->
+ terminate(Reason, [], GS2State #gs2_state { debug = Debug }).
-system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
- OldVsn, Extra) ->
+system_code_change(GS2State = #gs2_state { mod = Mod,
+ state = State },
+ _Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
- {ok, NewState} ->
- {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
- Else ->
+ {ok, NewState} ->
+ NewGS2State = find_prioritisers(
+ GS2State #gs2_state { state = NewState }),
+ {ok, [NewGS2State]};
+ Else ->
Else
end.
@@ -951,18 +928,18 @@ system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
%%-----------------------------------------------------------------
print_event(Dev, {in, Msg}, Name) ->
case Msg of
- {'$gen_call', {From, _Tag}, Call} ->
- io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
- [Name, Call, From]);
- {'$gen_cast', Cast} ->
- io:format(Dev, "*DBG* ~p got cast ~p~n",
- [Name, Cast]);
- _ ->
- io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+ {'$gen_call', {From, _Tag}, Call} ->
+ io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+ [Name, Call, From]);
+ {'$gen_cast', Cast} ->
+ io:format(Dev, "*DBG* ~p got cast ~p~n",
+ [Name, Cast]);
+ _ ->
+ io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
end;
print_event(Dev, {out, Msg, To, State}, Name) ->
- io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
- [Name, Msg, To, State]);
+ io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
+ [Name, Msg, To, State]);
print_event(Dev, {noreply, State}, Name) ->
io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
print_event(Dev, Event, Name) ->
@@ -973,23 +950,26 @@ print_event(Dev, Event, Name) ->
%%% Terminate the server.
%%% ---------------------------------------------------
-terminate(Reason, Name, Msg, Mod, State, Debug) ->
+terminate(Reason, Msg, #gs2_state { name = Name,
+ mod = Mod,
+ state = State,
+ debug = Debug }) ->
case catch Mod:terminate(Reason, State) of
- {'EXIT', R} ->
- error_info(R, Reason, Name, Msg, State, Debug),
- exit(R);
- _ ->
- case Reason of
- normal ->
- exit(normal);
- shutdown ->
- exit(shutdown);
- {shutdown,_}=Shutdown ->
- exit(Shutdown);
- _ ->
- error_info(Reason, undefined, Name, Msg, State, Debug),
- exit(Reason)
- end
+ {'EXIT', R} ->
+ error_info(R, Reason, Name, Msg, State, Debug),
+ exit(R);
+ _ ->
+ case Reason of
+ normal ->
+ exit(normal);
+ shutdown ->
+ exit(shutdown);
+ {shutdown,_}=Shutdown ->
+ exit(Shutdown);
+ _ ->
+ error_info(Reason, undefined, Name, Msg, State, Debug),
+ exit(Reason)
+ end
end.
error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
@@ -1038,74 +1018,109 @@ opt(_, []) ->
debug_options(Name, Opts) ->
case opt(debug, Opts) of
- {ok, Options} -> dbg_options(Name, Options);
- _ -> dbg_options(Name, [])
+ {ok, Options} -> dbg_options(Name, Options);
+ _ -> dbg_options(Name, [])
end.
dbg_options(Name, []) ->
- Opts =
- case init:get_argument(generic_debug) of
- error ->
- [];
- _ ->
- [log, statistics]
- end,
+ Opts =
+ case init:get_argument(generic_debug) of
+ error ->
+ [];
+ _ ->
+ [log, statistics]
+ end,
dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
dbg_opts(Name, Opts).
dbg_opts(Name, Opts) ->
case catch sys:debug_options(Opts) of
- {'EXIT',_} ->
- format("~p: ignoring erroneous debug options - ~p~n",
- [Name, Opts]),
- [];
- Dbg ->
- Dbg
+ {'EXIT',_} ->
+ format("~p: ignoring erroneous debug options - ~p~n",
+ [Name, Opts]),
+ [];
+ Dbg ->
+ Dbg
end.
get_proc_name(Pid) when is_pid(Pid) ->
Pid;
get_proc_name({local, Name}) ->
case process_info(self(), registered_name) of
- {registered_name, Name} ->
- Name;
- {registered_name, _Name} ->
- exit(process_not_registered);
- [] ->
- exit(process_not_registered)
- end;
+ {registered_name, Name} ->
+ Name;
+ {registered_name, _Name} ->
+ exit(process_not_registered);
+ [] ->
+ exit(process_not_registered)
+ end;
get_proc_name({global, Name}) ->
case global:safe_whereis_name(Name) of
- undefined ->
- exit(process_not_registered_globally);
- Pid when Pid =:= self() ->
- Name;
- _Pid ->
- exit(process_not_registered_globally)
+ undefined ->
+ exit(process_not_registered_globally);
+ Pid when Pid =:= self() ->
+ Name;
+ _Pid ->
+ exit(process_not_registered_globally)
end.
get_parent() ->
case get('$ancestors') of
- [Parent | _] when is_pid(Parent)->
+ [Parent | _] when is_pid(Parent)->
Parent;
[Parent | _] when is_atom(Parent)->
name_to_pid(Parent);
- _ ->
- exit(process_was_not_started_by_proc_lib)
+ _ ->
+ exit(process_was_not_started_by_proc_lib)
end.
name_to_pid(Name) ->
case whereis(Name) of
- undefined ->
- case global:safe_whereis_name(Name) of
- undefined ->
- exit(could_not_find_registerd_name);
- Pid ->
- Pid
- end;
- Pid ->
- Pid
+ undefined ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(could_not_find_registerd_name);
+ Pid ->
+ Pid
+ end;
+ Pid ->
+ Pid
+ end.
+
+find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
+ PrioriCall = function_exported_or_default(
+ Mod, 'prioritise_call', 3,
+ fun (_Msg, _From, _State) -> 0 end),
+ PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
+ fun (_Msg, _State) -> 0 end),
+ PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
+ fun (_Msg, _State) -> 0 end),
+ GS2State #gs2_state { prioritise_call = PrioriCall,
+ prioritise_cast = PrioriCast,
+ prioritise_info = PrioriInfo }.
+
+function_exported_or_default(Mod, Fun, Arity, Default) ->
+ case erlang:function_exported(Mod, Fun, Arity) of
+ true -> case Arity of
+ 2 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
+ case catch Mod:Fun(Msg, State) of
+ Res when is_integer(Res) ->
+ Res;
+ Err ->
+ handle_common_termination(Err, Msg, GS2State)
+ end
+ end;
+ 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
+ case catch Mod:Fun(Msg, From, State) of
+ Res when is_integer(Res) ->
+ Res;
+ Err ->
+ handle_common_termination(Err, Msg, GS2State)
+ end
+ end
+ end;
+ false -> Default
end.
%%-----------------------------------------------------------------
@@ -1115,25 +1130,23 @@ format_status(Opt, StatusData) ->
[PDict, SysState, Parent, Debug,
[Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData,
NameTag = if is_pid(Name) ->
- pid_to_list(Name);
- is_atom(Name) ->
- Name
- end,
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
Header = lists:concat(["Status for generic server ", NameTag]),
Log = sys:get_debug(log, Debug, []),
- Specfic =
- case erlang:function_exported(Mod, format_status, 2) of
- true ->
- case catch Mod:format_status(Opt, [PDict, State]) of
- {'EXIT', _} -> [{data, [{"State", State}]}];
- Else -> Else
- end;
- _ ->
- [{data, [{"State", State}]}]
- end,
+ Specfic =
+ case erlang:function_exported(Mod, format_status, 2) of
+ true -> case catch Mod:format_status(Opt, [PDict, State]) of
+ {'EXIT', _} -> [{data, [{"State", State}]}];
+ Else -> Else
+ end;
+ _ -> [{data, [{"State", State}]}]
+ end,
[{header, Header},
{data, [{"Status", SysState},
- {"Parent", Parent},
- {"Logged events", Log},
+ {"Parent", Parent},
+ {"Logged events", Log},
{"Queued messages", priority_queue:to_list(Queue)}]} |
Specfic].
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7116653c..3e677c38 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -332,10 +332,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_pcall(QPid, 9, info, infinity).
+ delegate_call(QPid, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -345,7 +345,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_pcall(QPid, 9, consumers, infinity).
+ delegate_call(QPid, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -357,7 +357,7 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
- delegate_pcast(QPid, 7, emit_stats).
+ delegate_cast(QPid, emit_stats).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -380,10 +380,10 @@ requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
- delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}).
+ delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
@@ -419,10 +419,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- delegate_pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate_cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- delegate_pcast(QPid, 7, {unblock, ChPid}).
+ delegate_cast(QPid, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
@@ -452,20 +452,19 @@ internal_delete(QueueName) ->
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun},
- infinity).
+ gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
update_ram_duration(QPid) ->
- gen_server2:pcast(QPid, 8, update_ram_duration).
+ gen_server2:cast(QPid, update_ram_duration).
set_ram_duration_target(QPid, Duration) ->
- gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}).
+ gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
set_maximum_since_use(QPid, Age) ->
- gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
maybe_expire(QPid) ->
- gen_server2:pcast(QPid, 8, maybe_expire).
+ gen_server2:cast(QPid, maybe_expire).
on_node_down(Node) ->
[Hook() ||
@@ -505,11 +504,6 @@ safe_delegate_call_ok(F, Pids) ->
delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
-delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid,
- fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-
-delegate_pcast(Pid, Pri, Msg) ->
- delegate:invoke_no_result(Pid,
- fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
+delegate_cast(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9a40580e..91877efb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,8 @@
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
-import(queue).
-import(erlang).
@@ -589,6 +590,29 @@ emit_stats(State) ->
%---------------------------------------------------------------------------
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ _ -> 0
+ end.
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 174eab40..f19f98d2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -41,7 +41,8 @@
-export([emit_stats/1, flush/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
start_limiter_fun, transaction_id, tx_participants, next_tag,
@@ -131,10 +132,10 @@ list() ->
info_keys() -> ?INFO_KEYS.
info(Pid) ->
- gen_server2:pcall(Pid, 9, info, infinity).
+ gen_server2:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of
+ case gen_server2:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -146,7 +147,7 @@ info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
emit_stats(Pid) ->
- gen_server2:pcast(Pid, 7, emit_stats).
+ gen_server2:cast(Pid, emit_stats).
flush(Pid) ->
gen_server2:call(Pid, flush).
@@ -179,6 +180,19 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ info -> 9;
+ {info, _Items} -> 9;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ emit_stats -> 7;
+ _ -> 0
+ end.
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index da7078f1..c323d7ce 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2]).
+ handle_info/2, prioritise_call/3]).
-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1]).
@@ -107,7 +107,7 @@ get_limit(undefined) ->
get_limit(Pid) ->
rabbit_misc:with_exit_handler(
fun () -> 0 end,
- fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+ fun () -> gen_server2:call(Pid, get_limit, infinity) end).
block(undefined) ->
ok;
@@ -126,6 +126,9 @@ unblock(LimiterPid) ->
init([ChPid, UnackedMsgCount]) ->
{ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+prioritise_call(get_limit, _From, _State) -> 9;
+prioritise_call(_Msg, _From, _State) -> 0.
+
handle_call({can_send, _QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
{reply, false, State};
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 18810833..bbecbfe2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -40,7 +40,7 @@
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
@@ -322,8 +322,8 @@ read(Server, Guid,
%% 2. Check the cur file cache
case ets:lookup(CurFileCacheEts, Guid) of
[] ->
- Defer = fun() -> {gen_server2:pcall(
- Server, 2, {read, Guid}, infinity),
+ Defer = fun() -> {gen_server2:call(
+ Server, {read, Guid}, infinity),
CState} end,
case index_lookup(Guid, CState) of
not_found -> Defer();
@@ -345,18 +345,18 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
release(_Server, []) -> ok;
release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
-sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
+sync(Server) -> gen_server2:cast(Server, sync). %% internal
gc_done(Server, Reclaimed, Source, Destination) ->
- gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}).
+ gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
set_maximum_since_use(Server, Age) ->
- gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Server, {set_maximum_since_use, Age}).
client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity),
+ gen_server2:call(Server, {new_client_state, Ref}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -376,7 +376,7 @@ client_delete_and_terminate(CState, Server, Ref) ->
ok = gen_server2:cast(Server, {client_delete, Ref}).
successfully_recovered_state(Server) ->
- gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
+ gen_server2:call(Server, successfully_recovered_state, infinity).
%%----------------------------------------------------------------------------
%% Client-side-only helpers
@@ -575,6 +575,22 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ {new_client_state, _Ref} -> 7;
+ successfully_recovered_state -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ sync -> 8;
+ {gc_done, _Reclaimed, _Source, _Destination} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ _ -> 0
+ end.
+
handle_call({read, Guid}, From, State) ->
State1 = read_message(Guid, From, State),
noreply(State1);
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index c7948b7e..a7855bbf 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -38,7 +38,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
-record(gcstate,
{dir,
@@ -81,7 +81,7 @@ stop(Server) ->
gen_server2:call(Server, stop, infinity).
set_maximum_since_use(Pid, Age) ->
- gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Pid, {set_maximum_since_use, Age}).
%%----------------------------------------------------------------------------
@@ -97,6 +97,9 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
+prioritise_cast(_Msg, _State) -> 0.
+
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 42049d50..f461a539 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -38,7 +38,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
@@ -71,7 +71,7 @@ submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
set_maximum_since_use(Pid, Age) ->
- gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Pid, {set_maximum_since_use, Age}).
run({M, F, A}) ->
apply(M, F, A);
@@ -88,6 +88,9 @@ init([WId]) ->
{ok, WId, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
+prioritise_cast(_Msg, _State) -> 0.
+
handle_call({submit, Fun}, From, WId) ->
gen_server2:reply(From, run(Fun)),
ok = worker_pool:idle(WId),