diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-15 17:21:20 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-15 17:21:20 +0100 |
commit | 96ff740ff960d5a7aefb816b909491af2b1937a6 (patch) | |
tree | 092970154aee0a16b418027822dd8e023b9a6d00 | |
parent | 5e375269b81f02d2fb74a334fb36d3b18eb407db (diff) | |
parent | a7e36d635b4dab45320cd4744d8e0f3be5c946df (diff) | |
download | rabbitmq-server-96ff740ff960d5a7aefb816b909491af2b1937a6.tar.gz |
Merging bug 23157 into default
-rw-r--r-- | src/gen_server2.erl | 917 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 36 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 26 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 22 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 7 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 32 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 7 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 7 |
8 files changed, 562 insertions, 492 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 9fb9e2fe..e2bb940f 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,10 +16,12 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% -%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3 -%% allow callers to attach priorities to requests. Requests with -%% higher priorities are processed before requests with lower -%% priorities. The default priority is 0. +%% 4) The callback module can optionally implement prioritise_call/3, +%% prioritise_cast/2 and prioritise_info/2. These functions take +%% Message, From and State or just Message and State and return a +%% single integer representing the priority attached to the message. +%% Messages with higher priorities are processed before requests with +%% lower priorities. The default priority is 0. %% %% 5) The callback module can optionally implement %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be @@ -82,13 +84,13 @@ %%% %%% The idea behind THIS server is that the user module %%% provides (different) functions to handle different -%%% kind of inputs. +%%% kind of inputs. %%% If the Parent process terminates the Module:terminate/2 %%% function is called. %%% %%% The user module should export: %%% -%%% init(Args) +%%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} %%% {ok, State, Timeout, Backoff} @@ -101,21 +103,21 @@ %%% {reply, Reply, State, Timeout} %%% {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, Reply, State} +%%% {stop, Reason, Reply, State} %%% Reason = normal | shutdown | Term terminate(State) is called %%% %%% handle_cast(Msg, State) %%% %%% ==> {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, State} +%%% {stop, Reason, State} %%% Reason = normal | shutdown | Term terminate(State) is called %%% %%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... %%% %%% ==> {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, State} +%%% {stop, Reason, State} %%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% terminate(Reason, State) Let the user module clean up @@ -159,37 +161,41 @@ %% API -export([start/3, start/4, - start_link/3, start_link/4, - call/2, call/3, pcall/3, pcall/4, - cast/2, pcast/3, reply/2, - abcast/2, abcast/3, - multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]). + start_link/3, start_link/4, + call/2, call/3, + cast/2, reply/2, + abcast/2, abcast/3, + multi_call/2, multi_call/3, multi_call/4, + enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). -export([behaviour_info/1]). %% System exports -export([system_continue/3, - system_terminate/4, - system_code_change/4, - format_status/2]). + system_terminate/4, + system_code_change/4, + format_status/2]). %% Internal exports -export([init_it/6, print_event/3]). -import(error_logger, [format/2]). +%% State record +-record(gs2_state, {parent, name, state, mod, time, + timeout_state, queue, debug, prioritise_call, + prioritise_cast, prioritise_info}). + %%%========================================================================= %%% Specs. These exist only to shut up dialyzer's warnings %%%========================================================================= -ifdef(use_specs). --spec(handle_common_termination/6 :: - (any(), any(), any(), atom(), any(), any()) -> no_return()). +-spec(handle_common_termination/3 :: + (any(), atom(), #gs2_state{}) -> no_return()). --spec(hibernate/7 :: - (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()). +-spec(hibernate/1 :: (#gs2_state{}) -> no_return()). -endif. @@ -238,37 +244,21 @@ start_link(Name, Mod, Args, Options) -> %% be monitored. %% If the client is trapping exits and is linked server termination %% is handled here (? Shall we do that here (or rely on timeouts) ?). -%% ----------------------------------------------------------------- +%% ----------------------------------------------------------------- call(Name, Request) -> case catch gen:call(Name, '$gen_call', Request) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request]}}) + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request]}}) end. call(Name, Request, Timeout) -> case catch gen:call(Name, '$gen_call', Request, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) - end. - -pcall(Name, Priority, Request) -> - case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}}) - end. - -pcall(Name, Priority, Request, Timeout) -> - case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}}) + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end. %% ----------------------------------------------------------------- @@ -277,34 +267,18 @@ pcall(Name, Priority, Request, Timeout) -> cast({global,Name}, Request) -> catch global:send(Name, cast_msg(Request)), ok; -cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> do_cast(Dest, Request); cast(Dest, Request) when is_atom(Dest) -> do_cast(Dest, Request); cast(Dest, Request) when is_pid(Dest) -> do_cast(Dest, Request). -do_cast(Dest, Request) -> +do_cast(Dest, Request) -> do_send(Dest, cast_msg(Request)), ok. - -cast_msg(Request) -> {'$gen_cast',Request}. -pcast({global,Name}, Priority, Request) -> - catch global:send(Name, cast_msg(Priority, Request)), - ok; -pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> - do_cast(Dest, Priority, Request); -pcast(Dest, Priority, Request) when is_atom(Dest) -> - do_cast(Dest, Priority, Request); -pcast(Dest, Priority, Request) when is_pid(Dest) -> - do_cast(Dest, Priority, Request). - -do_cast(Dest, Priority, Request) -> - do_send(Dest, cast_msg(Priority, Request)), - ok. - -cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. +cast_msg(Request) -> {'$gen_cast',Request}. %% ----------------------------------------------------------------- %% Send a reply to the client. @@ -312,9 +286,9 @@ cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. reply({To, Tag}, Reply) -> catch To ! {Tag, Reply}. -%% ----------------------------------------------------------------- -%% Asyncronous broadcast, returns nothing, it's just send'n prey -%%----------------------------------------------------------------- +%% ----------------------------------------------------------------- +%% Asyncronous broadcast, returns nothing, it's just send'n pray +%% ----------------------------------------------------------------- abcast(Name, Request) when is_atom(Name) -> do_abcast([node() | nodes()], Name, cast_msg(Request)). @@ -330,36 +304,36 @@ do_abcast([], _,_) -> abcast. %%% Make a call to servers at several nodes. %%% Returns: {[Replies],[BadNodes]} %%% A Timeout can be given -%%% +%%% %%% A middleman process is used in case late answers arrives after %%% the timeout. If they would be allowed to glog the callers message -%%% queue, it would probably become confused. Late answers will +%%% queue, it would probably become confused. Late answers will %%% now arrive to the terminated middleman and so be discarded. %%% ----------------------------------------------------------------- multi_call(Name, Req) when is_atom(Name) -> do_multi_call([node() | nodes()], Name, Req, infinity). -multi_call(Nodes, Name, Req) +multi_call(Nodes, Name, Req) when is_list(Nodes), is_atom(Name) -> do_multi_call(Nodes, Name, Req, infinity). multi_call(Nodes, Name, Req, infinity) -> do_multi_call(Nodes, Name, Req, infinity); -multi_call(Nodes, Name, Req, Timeout) +multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ -%% -%% Description: Makes an existing process into a gen_server. -%% The calling process will enter the gen_server receive +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive %% loop and become a gen_server process. -%% The process *must* have been started using one of the -%% start functions in proc_lib, see proc_lib(3). -%% The user is responsible for any initialization of the +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> @@ -386,7 +360,10 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Debug = debug_options(Name, Options), Queue = priority_queue:new(), Backoff1 = extend_backoff(Backoff), - loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). + loop(find_prioritisers( + #gs2_state { parent = Parent, name = Name, state = State, + mod = Mod, time = Timeout, timeout_state = Backoff1, + queue = Queue, debug = Debug })). %%%======================================================================== %%% Gen-callback functions @@ -405,39 +382,51 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), + GS2State = find_prioritisers( + #gs2_state { parent = Parent, + name = Name, + mod = Mod, + queue = Queue, + debug = Debug }), case catch Mod:init(Args) of - {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); - {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); - {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + {ok, State} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = infinity, + timeout_state = undefined }); + {ok, State, Timeout} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = Timeout, + timeout_state = undefined }); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> Backoff1 = extend_backoff(Backoff), - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); - {stop, Reason} -> - %% For consistency, we must make sure that the - %% registered name (if any) is unregistered before - %% the parent process is notified about the failure. - %% (Otherwise, the parent process could get - %% an 'already_started' error if it immediately - %% tried starting the process again.) - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - ignore -> - unregister_name(Name0), - proc_lib:init_ack(Starter, ignore), - exit(normal); - {'EXIT', Reason} -> - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - Else -> - Error = {bad_return_value, Else}, - proc_lib:init_ack(Starter, {error, Error}), - exit(Error) + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = Timeout, + timeout_state = Backoff1 }); + {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + ignore -> + unregister_name(Name0), + proc_lib:init_ack(Starter, ignore), + exit(normal); + {'EXIT', Reason} -> + unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + Else -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error) end. name({local,Name}) -> Name; @@ -467,23 +456,24 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> - pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); -loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> - process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, - drain(Queue), Debug). +loop(GS2State = #gs2_state { time = hibernate, + timeout_state = undefined }) -> + pre_hibernate(GS2State); +loop(GS2State) -> + process_next_msg(drain(GS2State)). -drain(Queue) -> +drain(GS2State) -> receive - Input -> drain(in(Input, Queue)) - after 0 -> Queue + Input -> drain(in(Input, GS2State)) + after 0 -> GS2State end. -process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> +process_next_msg(GS2State = #gs2_state { time = Time, + timeout_state = TimeoutState, + queue = Queue }) -> case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Msg); + process_msg(Msg, GS2State #gs2_state { queue = Queue1 }); {empty, Queue1} -> {Time1, HibOnTimeout} = case {Time, TimeoutState} of @@ -504,68 +494,64 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> Input -> %% Time could be 'hibernate' here, so *don't* call loop process_next_msg( - Parent, Name, State, Mod, Time, TimeoutState, - drain(in(Input, Queue1)), Debug) + drain(in(Input, GS2State #gs2_state { queue = Queue1 }))) after Time1 -> case HibOnTimeout of true -> pre_hibernate( - Parent, Name, State, Mod, TimeoutState, Queue1, - Debug); + GS2State #gs2_state { queue = Queue1 }); false -> - process_msg( - Parent, Name, State, Mod, Time, TimeoutState, - Queue1, Debug, timeout) + process_msg(timeout, + GS2State #gs2_state { queue = Queue1 }) end end end. -wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> +wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> TimeoutState1 = case TS of undefined -> undefined; {SleptAt, TimeoutState} -> adjust_timeout_state(SleptAt, now(), TimeoutState) end, - post_hibernate(Parent, Name, State, Mod, TimeoutState1, - drain(Queue), Debug). + post_hibernate( + drain(GS2State #gs2_state { timeout_state = TimeoutState1 })). -hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> TS = case TimeoutState of undefined -> undefined; {backoff, _, _, _, _} -> {now(), TimeoutState} end, - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, - TS, Queue, Debug]). + proc_lib:hibernate(?MODULE, wake_hib, + [GS2State #gs2_state { timeout_state = TS }]). -pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +pre_hibernate(GS2State = #gs2_state { state = State, + mod = Mod }) -> case erlang:function_exported(Mod, handle_pre_hibernate, 1) of true -> case catch Mod:handle_pre_hibernate(State) of {hibernate, NState} -> - hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, - Debug); + hibernate(GS2State #gs2_state { state = NState } ); Reply -> - handle_common_termination(Reply, Name, pre_hibernate, - Mod, State, Debug) + handle_common_termination(Reply, pre_hibernate, GS2State) end; false -> - hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + hibernate(GS2State) end. -post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +post_hibernate(GS2State = #gs2_state { state = State, + mod = Mod }) -> case erlang:function_exported(Mod, handle_post_hibernate, 1) of true -> case catch Mod:handle_post_hibernate(State) of {noreply, NState} -> - process_next_msg(Parent, Name, NState, Mod, infinity, - TimeoutState, Queue, Debug); + process_next_msg(GS2State #gs2_state { state = NState, + time = infinity }); {noreply, NState, Time} -> - process_next_msg(Parent, Name, NState, Mod, Time, - TimeoutState, Queue, Debug); + process_next_msg(GS2State #gs2_state { state = NState, + time = Time }); Reply -> - handle_common_termination(Reply, Name, post_hibernate, - Mod, State, Debug) + handle_common_termination(Reply, post_hibernate, GS2State) end; false -> %% use hibernate here, not infinity. This matches @@ -574,8 +560,7 @@ post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> %% still set to hibernate, iff that msg is the very msg %% that woke us up (or the first msg we receive after %% waking up). - process_next_msg(Parent, Name, State, Mod, hibernate, - TimeoutState, Queue, Debug) + process_next_msg(GS2State #gs2_state { time = hibernate }) end. adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, @@ -596,32 +581,40 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, CurrentTO1 = Base + Extra, {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. -in({'$gen_pcast', {Priority, Msg}}, Queue) -> - priority_queue:in({'$gen_cast', Msg}, Priority, Queue); -in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> - priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); -in(Input, Queue) -> - priority_queue:in(Input, Queue). - -process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, Msg) -> +in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC, + queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + {'$gen_cast', Msg}, + PC(Msg, GS2State), Queue) }; +in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC, + queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + {'$gen_call', From, Msg}, + PC(Msg, From, GS2State), Queue) }; +in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + Input, PI(Input, GS2State), Queue) }. + +process_msg(Msg, + GS2State = #gs2_state { parent = Parent, + name = Name, + debug = Debug }) -> case Msg of - {system, From, Req} -> - sys:handle_system_msg( + {system, From, Req} -> + sys:handle_system_msg( Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, TimeoutState, Queue]); + GS2State); %% gen_server puts Hib on the end as the 7th arg, but that %% version of the function seems not to be documented so %% leaving out for now. - {'EXIT', Parent, Reason} -> - terminate(Reason, Name, Msg, Mod, State, Debug); - _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); - _Msg -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, - Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, - Debug1) + {'EXIT', Parent, Reason} -> + terminate(Reason, Msg, GS2State); + _Msg when Debug =:= [] -> + handle_msg(Msg, GS2State); + _Msg -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Name, {in, Msg}), + handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }) end. %%% --------------------------------------------------- @@ -638,35 +631,35 @@ do_multi_call(Nodes, Name, Req, Timeout) -> Tag = make_ref(), Caller = self(), Receiver = - spawn( - fun () -> - %% Middleman process. Should be unsensitive to regular - %% exit signals. The sychronization is needed in case - %% the receiver would exit before the caller started - %% the monitor. - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller,Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Req), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(),Tag,Result}); - {'DOWN',Mref,_,_,_} -> - %% Caller died before sending us the go-ahead. - %% Give up silently. - exit(normal) - end - end), + spawn( + fun () -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), Mref = erlang:monitor(process, Receiver), Receiver ! {self(),Tag}, receive - {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> - Result; - {'DOWN',Mref,_,_,Reason} -> - %% The middleman code failed. Or someone did - %% exit(_, kill) on the middleman process => Reason==killed - exit(Reason) + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) end. send_nodes(Nodes, Name, Tag, Req) -> @@ -681,7 +674,7 @@ send_nodes([Node|Tail], Name, Tag, Req, Monitors) send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> %% Skip non-atom Node send_nodes(Tail, Name, Tag, Req, Monitors); -send_nodes([], _Name, _Tag, _Req, Monitors) -> +send_nodes([], _Name, _Tag, _Req, Monitors) -> Monitors. %% Against old nodes: @@ -691,89 +684,89 @@ send_nodes([], _Name, _Tag, _Req, Monitors) -> %% Against contemporary nodes: %% Wait for reply, server 'DOWN', or timeout from TimerId. -rec_nodes(Tag, Nodes, Name, TimerId) -> +rec_nodes(Tag, Nodes, Name, TimerId) -> rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], Time, TimerId); - {timeout, TimerId, _} -> - unmonitor(R), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + unmonitor(R), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) after Time -> - case rpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N|Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], - Replies, 2000, TimerId) - end + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end end; rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok end, {Replies, Badnodes}. %% Collect all replies that already have arrived rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> {Replies, Badnodes}. @@ -785,28 +778,28 @@ rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end end. %% Cancels a monitor started with Ref=erlang:monitor(_, _). unmonitor(Ref) when is_reference(Ref) -> erlang:demonitor(Ref), receive - {'DOWN', Ref, _, _, _} -> - true + {'DOWN', Ref, _, _, _} -> + true after 0 -> - true + true end. %%% --------------------------------------------------- @@ -818,130 +811,114 @@ dispatch({'$gen_cast', Msg}, Mod, State) -> dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, TimeoutState, Queue) -> - case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {reply, Reply, NState, Time1} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, [])), - reply(From, Reply), - exit(R); - Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State, - TimeoutState, Queue) - end; -handle_msg(Msg, - Parent, Name, State, Mod, TimeoutState, Queue) -> - Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue). - -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +common_reply(_Name, From, Reply, _NState, [] = _Debug) -> + reply(From, Reply), + []; +common_reply(Name, From, Reply, NState, Debug) -> + reply(Name, From, Reply, NState, Debug). + +common_debug([] = _Debug, _Func, _Info, _Event) -> + []; +common_debug(Debug, Func, Info, Event) -> + sys:handle_debug(Debug, Func, Info, Event). + +handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, + state = State, + name = Name, + debug = Debug }) -> case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {reply, Reply, NState, Time1} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), - reply(Name, From, Reply, NState, Debug), - exit(R); - Other -> - handle_common_reply(Other, Parent, Name, Msg, Mod, State, - TimeoutState, Queue, Debug) + {reply, Reply, NState} -> + Debug1 = common_reply(Name, From, Reply, NState, Debug), + loop(GS2State #gs2_state { state = NState, + time = infinity, + debug = Debug1 }); + {reply, Reply, NState, Time1} -> + Debug1 = common_reply(Name, From, Reply, NState, Debug), + loop(GS2State #gs2_state { state = NState, + time = Time1, + debug = Debug1}); + {noreply, NState} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state {state = NState, + time = infinity, + debug = Debug1}); + {noreply, NState, Time1} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state {state = NState, + time = Time1, + debug = Debug1}); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Msg, + GS2State #gs2_state { state = NState })), + reply(Name, From, Reply, NState, Debug), + exit(R); + Other -> + handle_common_reply(Other, Msg, GS2State) end; -handle_msg(Msg, - Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue, Debug). + handle_common_reply(Reply, Msg, GS2State). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue) -> +handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, + debug = Debug}) -> case Reply of - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + {noreply, NState} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state { state = NState, + time = infinity, + debug = Debug1 }); + {noreply, NState, Time1} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state { state = NState, + time = Time1, + debug = Debug1 }); _ -> - handle_common_termination(Reply, Name, Msg, Mod, State, []) + handle_common_termination(Reply, Msg, GS2State) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, - Debug) -> +handle_common_termination(Reply, Msg, GS2State) -> case Reply of - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + {stop, Reason, NState} -> + terminate(Reason, Msg, GS2State #gs2_state { state = NState }); + {'EXIT', What} -> + terminate(What, Msg, GS2State); _ -> - handle_common_termination(Reply, Name, Msg, Mod, State, Debug) - end. - -handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> - case Reply of - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, Debug); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, Debug); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) + terminate({bad_return_value, Reply}, Msg, GS2State) end. reply(Name, {To, Tag}, Reply, State, Debug) -> reply({To, Tag}, Reply), - sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {out, Reply, To, State} ). + sys:handle_debug( + Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}). %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> - loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). +system_continue(Parent, Debug, GS2State) -> + loop(GS2State #gs2_state { parent = Parent, debug = Debug }). -ifdef(use_specs). -spec system_terminate(_, _, _, [_]) -> no_return(). -endif. -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, - _TimeoutState, _Queue]) -> - terminate(Reason, Name, [], Mod, State, Debug). +system_terminate(Reason, _Parent, Debug, GS2State) -> + terminate(Reason, [], GS2State #gs2_state { debug = Debug }). -system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, - OldVsn, Extra) -> +system_code_change(GS2State = #gs2_state { mod = Mod, + state = State }, + _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> - {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; - Else -> + {ok, NewState} -> + NewGS2State = find_prioritisers( + GS2State #gs2_state { state = NewState }), + {ok, [NewGS2State]}; + Else -> Else end. @@ -951,18 +928,18 @@ system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, %%----------------------------------------------------------------- print_event(Dev, {in, Msg}, Name) -> case Msg of - {'$gen_call', {From, _Tag}, Call} -> - io:format(Dev, "*DBG* ~p got call ~p from ~w~n", - [Name, Call, From]); - {'$gen_cast', Cast} -> - io:format(Dev, "*DBG* ~p got cast ~p~n", - [Name, Cast]); - _ -> - io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~p got call ~p from ~w~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~p got cast ~p~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) end; print_event(Dev, {out, Msg, To, State}, Name) -> - io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", - [Name, Msg, To, State]); + io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", + [Name, Msg, To, State]); print_event(Dev, {noreply, State}, Name) -> io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); print_event(Dev, Event, Name) -> @@ -973,23 +950,26 @@ print_event(Dev, Event, Name) -> %%% Terminate the server. %%% --------------------------------------------------- -terminate(Reason, Name, Msg, Mod, State, Debug) -> +terminate(Reason, Msg, #gs2_state { name = Name, + mod = Mod, + state = State, + debug = Debug }) -> case catch Mod:terminate(Reason, State) of - {'EXIT', R} -> - error_info(R, Reason, Name, Msg, State, Debug), - exit(R); - _ -> - case Reason of - normal -> - exit(normal); - shutdown -> - exit(shutdown); - {shutdown,_}=Shutdown -> - exit(Shutdown); - _ -> - error_info(Reason, undefined, Name, Msg, State, Debug), - exit(Reason) - end + {'EXIT', R} -> + error_info(R, Reason, Name, Msg, State, Debug), + exit(R); + _ -> + case Reason of + normal -> + exit(normal); + shutdown -> + exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); + _ -> + error_info(Reason, undefined, Name, Msg, State, Debug), + exit(Reason) + end end. error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> @@ -1038,74 +1018,109 @@ opt(_, []) -> debug_options(Name, Opts) -> case opt(debug, Opts) of - {ok, Options} -> dbg_options(Name, Options); - _ -> dbg_options(Name, []) + {ok, Options} -> dbg_options(Name, Options); + _ -> dbg_options(Name, []) end. dbg_options(Name, []) -> - Opts = - case init:get_argument(generic_debug) of - error -> - []; - _ -> - [log, statistics] - end, + Opts = + case init:get_argument(generic_debug) of + error -> + []; + _ -> + [log, statistics] + end, dbg_opts(Name, Opts); dbg_options(Name, Opts) -> dbg_opts(Name, Opts). dbg_opts(Name, Opts) -> case catch sys:debug_options(Opts) of - {'EXIT',_} -> - format("~p: ignoring erroneous debug options - ~p~n", - [Name, Opts]), - []; - Dbg -> - Dbg + {'EXIT',_} -> + format("~p: ignoring erroneous debug options - ~p~n", + [Name, Opts]), + []; + Dbg -> + Dbg end. get_proc_name(Pid) when is_pid(Pid) -> Pid; get_proc_name({local, Name}) -> case process_info(self(), registered_name) of - {registered_name, Name} -> - Name; - {registered_name, _Name} -> - exit(process_not_registered); - [] -> - exit(process_not_registered) - end; + {registered_name, Name} -> + Name; + {registered_name, _Name} -> + exit(process_not_registered); + [] -> + exit(process_not_registered) + end; get_proc_name({global, Name}) -> case global:safe_whereis_name(Name) of - undefined -> - exit(process_not_registered_globally); - Pid when Pid =:= self() -> - Name; - _Pid -> - exit(process_not_registered_globally) + undefined -> + exit(process_not_registered_globally); + Pid when Pid =:= self() -> + Name; + _Pid -> + exit(process_not_registered_globally) end. get_parent() -> case get('$ancestors') of - [Parent | _] when is_pid(Parent)-> + [Parent | _] when is_pid(Parent)-> Parent; [Parent | _] when is_atom(Parent)-> name_to_pid(Parent); - _ -> - exit(process_was_not_started_by_proc_lib) + _ -> + exit(process_was_not_started_by_proc_lib) end. name_to_pid(Name) -> case whereis(Name) of - undefined -> - case global:safe_whereis_name(Name) of - undefined -> - exit(could_not_find_registerd_name); - Pid -> - Pid - end; - Pid -> - Pid + undefined -> + case global:safe_whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> + PrioriCall = function_exported_or_default( + Mod, 'prioritise_call', 3, + fun (_Msg, _From, _State) -> 0 end), + PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2, + fun (_Msg, _State) -> 0 end), + PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2, + fun (_Msg, _State) -> 0 end), + GS2State #gs2_state { prioritise_call = PrioriCall, + prioritise_cast = PrioriCast, + prioritise_info = PrioriInfo }. + +function_exported_or_default(Mod, Fun, Arity, Default) -> + case erlang:function_exported(Mod, Fun, Arity) of + true -> case Arity of + 2 -> fun (Msg, GS2State = #gs2_state { state = State }) -> + case catch Mod:Fun(Msg, State) of + Res when is_integer(Res) -> + Res; + Err -> + handle_common_termination(Err, Msg, GS2State) + end + end; + 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) -> + case catch Mod:Fun(Msg, From, State) of + Res when is_integer(Res) -> + Res; + Err -> + handle_common_termination(Err, Msg, GS2State) + end + end + end; + false -> Default end. %%----------------------------------------------------------------- @@ -1115,25 +1130,23 @@ format_status(Opt, StatusData) -> [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> - pid_to_list(Name); - is_atom(Name) -> - Name - end, + pid_to_list(Name); + is_atom(Name) -> + Name + end, Header = lists:concat(["Status for generic server ", NameTag]), Log = sys:get_debug(log, Debug, []), - Specfic = - case erlang:function_exported(Mod, format_status, 2) of - true -> - case catch Mod:format_status(Opt, [PDict, State]) of - {'EXIT', _} -> [{data, [{"State", State}]}]; - Else -> Else - end; - _ -> - [{data, [{"State", State}]}] - end, + Specfic = + case erlang:function_exported(Mod, format_status, 2) of + true -> case catch Mod:format_status(Opt, [PDict, State]) of + {'EXIT', _} -> [{data, [{"State", State}]}]; + Else -> Else + end; + _ -> [{data, [{"State", State}]}] + end, [{header, Header}, {data, [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, + {"Parent", Parent}, + {"Logged events", Log}, {"Queued messages", priority_queue:to_list(Queue)}]} | Specfic]. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7116653c..3e677c38 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -332,10 +332,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, info, infinity). + delegate_call(QPid, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_pcall(QPid, 9, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -345,7 +345,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, consumers, infinity). + delegate_call(QPid, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -357,7 +357,7 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> - delegate_pcast(QPid, 7, emit_stats). + delegate_cast(QPid, emit_stats). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -380,10 +380,10 @@ requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> - delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> - delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( @@ -419,10 +419,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - delegate_pcast(QPid, 7, {notify_sent, ChPid}). + delegate_cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - delegate_pcast(QPid, 7, {unblock, ChPid}). + delegate_cast(QPid, {unblock, ChPid}). flush_all(QPids, ChPid) -> delegate:invoke_no_result( @@ -452,20 +452,19 @@ internal_delete(QueueName) -> end. maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun}, - infinity). + gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). update_ram_duration(QPid) -> - gen_server2:pcast(QPid, 8, update_ram_duration). + gen_server2:cast(QPid, update_ram_duration). set_ram_duration_target(QPid, Duration) -> - gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). + gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> - gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(QPid, {set_maximum_since_use, Age}). maybe_expire(QPid) -> - gen_server2:pcast(QPid, 8, maybe_expire). + gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> [Hook() || @@ -505,11 +504,6 @@ safe_delegate_call_ok(F, Pids) -> delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). -delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, - fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). - -delegate_pcast(Pid, Pri, Msg) -> - delegate:invoke_no_result(Pid, - fun (P) -> gen_server2:pcast(P, Pri, Msg) end). +delegate_cast(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9a40580e..91877efb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,8 @@ -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -import(queue). -import(erlang). @@ -589,6 +590,29 @@ emit_stats(State) -> %--------------------------------------------------------------------------- +prioritise_call(Msg, _From, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {maybe_run_queue_via_backing_queue, _Fun} -> 6; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + _ -> 0 + end. + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 174eab40..f19f98d2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -41,7 +41,8 @@ -export([emit_stats/1, flush/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, @@ -131,10 +132,10 @@ list() -> info_keys() -> ?INFO_KEYS. info(Pid) -> - gen_server2:pcall(Pid, 9, info, infinity). + gen_server2:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of + case gen_server2:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -146,7 +147,7 @@ info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). emit_stats(Pid) -> - gen_server2:pcast(Pid, 7, emit_stats). + gen_server2:cast(Pid, emit_stats). flush(Pid) -> gen_server2:call(Pid, flush). @@ -179,6 +180,19 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_call(Msg, _From, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + emit_stats -> 7; + _ -> 0 + end. + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index da7078f1..c323d7ce 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2]). + handle_info/2, prioritise_call/3]). -export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -107,7 +107,7 @@ get_limit(undefined) -> get_limit(Pid) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + fun () -> gen_server2:call(Pid, get_limit, infinity) end). block(undefined) -> ok; @@ -126,6 +126,9 @@ unblock(LimiterPid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +prioritise_call(get_limit, _From, _State) -> 9; +prioritise_call(_Msg, _From, _State) -> 0. + handle_call({can_send, _QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, State}; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 18810833..bbecbfe2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -40,7 +40,7 @@ -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- @@ -322,8 +322,8 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:pcall( - Server, 2, {read, Guid}, infinity), + Defer = fun() -> {gen_server2:call( + Server, {read, Guid}, infinity), CState} end, case index_lookup(Guid, CState) of not_found -> Defer(); @@ -345,18 +345,18 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). release(_Server, []) -> ok; release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). -sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal +sync(Server) -> gen_server2:cast(Server, sync). %% internal gc_done(Server, Reclaimed, Source, Destination) -> - gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}). + gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). set_maximum_since_use(Server, Age) -> - gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Server, {set_maximum_since_use, Age}). client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity), + gen_server2:call(Server, {new_client_state, Ref}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -376,7 +376,7 @@ client_delete_and_terminate(CState, Server, Ref) -> ok = gen_server2:cast(Server, {client_delete, Ref}). successfully_recovered_state(Server) -> - gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). + gen_server2:call(Server, successfully_recovered_state, infinity). %%---------------------------------------------------------------------------- %% Client-side-only helpers @@ -575,6 +575,22 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_call(Msg, _From, _State) -> + case Msg of + {new_client_state, _Ref} -> 7; + successfully_recovered_state -> 7; + {read, _Guid} -> 2; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + sync -> 8; + {gc_done, _Reclaimed, _Source, _Destination} -> 8; + {set_maximum_since_use, _Age} -> 8; + _ -> 0 + end. + handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), noreply(State1); diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index c7948b7e..a7855bbf 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -38,7 +38,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). -record(gcstate, {dir, @@ -81,7 +81,7 @@ stop(Server) -> gen_server2:call(Server, stop, infinity). set_maximum_since_use(Pid, Age) -> - gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Pid, {set_maximum_since_use, Age}). %%---------------------------------------------------------------------------- @@ -97,6 +97,9 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; +prioritise_cast(_Msg, _State) -> 0. + handle_call(stop, _From, State) -> {stop, normal, ok, State}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 42049d50..f461a539 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -38,7 +38,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- @@ -71,7 +71,7 @@ submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). set_maximum_since_use(Pid, Age) -> - gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Pid, {set_maximum_since_use, Age}). run({M, F, A}) -> apply(M, F, A); @@ -88,6 +88,9 @@ init([WId]) -> {ok, WId, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; +prioritise_cast(_Msg, _State) -> 0. + handle_call({submit, Fun}, From, WId) -> gen_server2:reply(From, run(Fun)), ok = worker_pool:idle(WId), |