diff options
author | Luke Bakken <luke@bakken.io> | 2022-10-21 11:13:23 -0700 |
---|---|---|
committer | Luke Bakken <luke@bakken.io> | 2022-10-21 11:13:23 -0700 |
commit | cfb7dae93f295af5d7a03fca15358bd6c0df9cbe (patch) | |
tree | f50a27e2fcfb3cf617417bc5ff60b0bdeb0fd636 | |
parent | eb0c2fc472ec44ca4dba6abca9844720b21fad98 (diff) | |
download | rabbitmq-server-git-lukebakken/update-gen_server2.tar.gz |
Begin updating gen_server2lukebakken/update-gen_server2
Uses OTP 25.1.1 as a base version
-rw-r--r-- | deps/rabbit_common/src/gen_server2.erl | 2302 |
1 files changed, 1356 insertions, 946 deletions
diff --git a/deps/rabbit_common/src/gen_server2.erl b/deps/rabbit_common/src/gen_server2.erl index c588d9d867..ae2c43ee34 100644 --- a/deps/rabbit_common/src/gen_server2.erl +++ b/deps/rabbit_common/src/gen_server2.erl @@ -1,5 +1,5 @@ -%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP -%% distribution, with the following modifications: +%% This file is a copy of gen_server.erl from Erlang/OTP 25.1.1 +%% with the following modifications: %% %% 1) the module name is gen_server2 %% @@ -92,33 +92,36 @@ %% particular node. %% %% 11) Internal buffer length is emitted as a core [RabbitMQ] metric. - +%% %% All modifications are (C) 2009-2022 VMware, Inc. or its affiliates. - -%% ``The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved via the world wide web at https://www.erlang.org/. %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. +%% Copyright Ericsson AB 1996-2022. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at %% -%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. -%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings -%% AB. All Rights Reserved.'' +%% http://www.apache.org/licenses/LICENSE-2.0 %% -%% $Id$ +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% -module(gen_server2). --ifdef(OTP_RELEASE). --if(?OTP_RELEASE >= 22). --compile(nowarn_deprecated_function). --endif. --endif. +%% TODO LRB +%% -ifdef(OTP_RELEASE). +%% -if(?OTP_RELEASE >= 22). +%% -compile(nowarn_deprecated_function). +%% -endif. +%% -endif. + +%%% +%%% NOTE: If init_ack() return values are modified, see comment +%%% above monitor_return() in gen.erl! +%%% %%% --------------------------------------------------- %%% @@ -204,20 +207,29 @@ %% API -export([start/3, start/4, start_link/3, start_link/4, + start_monitor/3, start_monitor/4, stop/1, stop/3, call/2, call/3, + send_request/2, send_request/4, + wait_response/2, receive_response/2, check_response/2, + wait_response/3, receive_response/3, check_response/3, + reqids_new/0, reqids_size/1, + reqids_add/3, reqids_to_list/1, cast/2, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - mcall/1, - with_state/2, - enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/6]). %% System exports -export([system_continue/3, - system_terminate/4, - system_code_change/4, - format_status/2]). + system_terminate/4, + system_code_change/4, + system_get_state/1, + system_replace_state/2, + format_status/2]). + +%% logger callback +-export([format_log/1, format_log/2]). %% Internal exports -export([init_it/6]). @@ -229,20 +241,31 @@ timeout_state, queue, debug, prioritisers, timer, emit_stats_fun, stop_stats_fun}). -%%%========================================================================= -%%% Specs. These exist only to shut up dialyzer's warnings -%%%========================================================================= - --type gs2_state() :: #gs2_state{}. - --spec handle_common_termination(any(), atom(), gs2_state()) -> no_return(). --spec hibernate(gs2_state()) -> no_return(). --spec pre_hibernate(gs2_state()) -> no_return(). --spec system_terminate(_, _, _, gs2_state()) -> no_return(). +-export_type( + [from/0, + reply_tag/0, + request_id/0, + request_id_collection/0, + format_status/0]). + +-export_type( + [server_name/0, + server_ref/0, + start_opt/0, + enter_loop_opt/0, + start_ret/0, + start_mon_ret/0]). -type millis() :: non_neg_integer(). --dialyzer({nowarn_function, do_multi_call/4}). +-define( + STACKTRACE(), + element(2, erlang:process_info(self(), current_stacktrace))). + +-define( + is_timeout(X), + ( (X) =:= infinity orelse ( is_integer(X) andalso (X) >= 0 ) ) +). %%%========================================================================= %%% API @@ -250,39 +273,71 @@ -callback init(Args :: term()) -> {ok, State :: term()} | - {ok, State :: term(), timeout() | hibernate} | + {ok, State :: term(), timeout() | hibernate | {continue, term()}} | {ok, State :: term(), timeout() | hibernate, {backoff, millis(), millis(), millis()}} | {ok, State :: term(), timeout() | hibernate, {backoff, millis(), millis(), millis()}, atom()} | ignore | {stop, Reason :: term()}. --callback handle_call(Request :: term(), From :: {pid(), Tag :: term()}, +-callback handle_call(Request :: term(), From :: from(), State :: term()) -> {reply, Reply :: term(), NewState :: term()} | - {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} | + {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} | {noreply, NewState :: term()} | - {noreply, NewState :: term(), timeout() | hibernate} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), - Reply :: term(), NewState :: term()}. + Reply :: term(), NewState :: term()} | + {stop, Reason :: term(), NewState :: term()}. -callback handle_cast(Request :: term(), State :: term()) -> {noreply, NewState :: term()} | - {noreply, NewState :: term(), timeout() | hibernate} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), NewState :: term()}. --callback handle_info(Info :: term(), State :: term()) -> +-callback handle_info(Info :: timeout | term(), State :: term()) -> {noreply, NewState :: term()} | - {noreply, NewState :: term(), timeout() | hibernate} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | {stop, Reason :: term(), NewState :: term()}. --callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), +-callback handle_continue(Info :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), NewState :: term()}. +-callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | + term()), State :: term()) -> - ok | term(). + term(). -callback code_change(OldVsn :: (term() | {down, term()}), State :: term(), Extra :: term()) -> {ok, NewState :: term()} | {error, Reason :: term()}. +-callback format_status(Opt, StatusData) -> Status when + Opt :: 'normal' | 'terminate', + StatusData :: [PDict | State], + PDict :: [{Key :: term(), Value :: term()}], + State :: term(), + Status :: term(). +-type format_status() :: + #{ state => term(), + message => term(), + reason => term(), + log => [sys:system_event()] }. +-callback format_status(Status) -> NewStatus when + Status :: format_status(), + NewStatus :: format_status(). + +-optional_callbacks( + [handle_info/2, handle_continue/2, terminate/2, code_change/3, + format_status/1, format_status/2]). + + + +-type from() :: {Client :: pid(), Tag :: reply_tag()}. +-opaque reply_tag() :: gen:reply_tag(). + +-opaque request_id() :: gen:request_id(). + +-opaque request_id_collection() :: gen:request_id_collection(). -%% It's not possible to define "optional" -callbacks, so putting specs -%% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result -%% in warnings (the same applied for the behaviour_info before). +-type response_timeout() :: + timeout() | {abs, integer()}. %%% ----------------------------------------------------------------- %%% Starts a generic server. @@ -290,7 +345,7 @@ %%% start(Name, Mod, Args, Options) %%% start_link(Mod, Args, Options) %%% start_link(Name, Mod, Args, Options) where: -%%% Name ::= {local, atom()} | {global, atom()} +%%% Name ::= {local, atom()} | {global, term()} | {via, atom(), term()} %%% Mod ::= atom(), callback module implementing the 'real' server %%% Args ::= term(), init arguments (to Mod:init/1) %%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] @@ -300,28 +355,125 @@ %%% {error, {already_started, Pid}} | %%% {error, Reason} %%% ----------------------------------------------------------------- -start(Mod, Args, Options) -> - gen:start(?MODULE, nolink, Mod, Args, Options). -start(Name, Mod, Args, Options) -> - gen:start(?MODULE, nolink, Name, Mod, Args, Options). +-type server_name() :: % Duplicate of gen:emgr_name() + {'local', LocalName :: atom()} + | {'global', GlobalName :: term()} + | {'via', RegMod :: module(), ViaName :: term()}. + +-type server_ref() :: % What gen:call/3,4 and gen:stop/1,3 accepts + pid() + | (LocalName :: atom()) + | {Name :: atom(), Node :: atom()} + | {'global', GlobalName :: term()} + | {'via', RegMod :: module(), ViaName :: term()}. + +-type start_opt() :: % Duplicate of gen:option() + {'timeout', Timeout :: timeout()} + | {'spawn_opt', SpawnOptions :: [proc_lib:spawn_option()]} + | enter_loop_opt(). +%% +-type enter_loop_opt() :: % Some gen:option()s works for enter_loop/* + {'hibernate_after', HibernateAfterTimeout :: timeout()} + | {'debug', Dbgs :: [sys:debug_option()]}. + +-type start_ret() :: % gen:start_ret() without monitor return + {'ok', Pid :: pid()} + | 'ignore' + | {'error', Reason :: term()}. -start_link(Mod, Args, Options) -> - gen:start(?MODULE, link, Mod, Args, Options). +-type start_mon_ret() :: % gen:start_ret() with only monitor return + {'ok', {Pid :: pid(), MonRef :: reference()}} + | 'ignore' + | {'error', Reason :: term()}. + +%%% --------------------------------------------------- + +-spec start( + Module :: module(), + Args :: term(), + Options :: [start_opt()] + ) -> + start_ret(). +%% +start(Module, Args, Options) -> + gen:start(?MODULE, nolink, Module, Args, Options). + +-spec start( + ServerName :: server_name(), + Module :: module(), + Args :: term(), + Options :: [start_opt()] + ) -> + start_ret(). +%% +start(ServerName, Module, Args, Options) -> + gen:start(?MODULE, nolink, ServerName, Module, Args, Options). + +-spec start_link( + Module :: module(), + Args :: term(), + Options :: [start_opt()] + ) -> + start_ret(). +%% +start_link(Module, Args, Options) -> + gen:start(?MODULE, link, Module, Args, Options). + +-spec start_link( + ServerName :: server_name(), + Module :: module(), + Args :: term(), + Options :: [start_opt()] + ) -> + start_ret(). +%% +start_link(ServerName, Module, Args, Options) -> + gen:start(?MODULE, link, ServerName, Module, Args, Options). + +-spec start_monitor( + Module :: module(), + Args :: term(), + Options :: [start_opt()] + ) -> + start_mon_ret(). +%% +start_monitor(Module, Args, Options) -> + gen:start(?MODULE, monitor, Module, Args, Options). + +-spec start_monitor( + ServerName :: server_name(), + Module :: module(), + Args :: term(), + Options :: [start_opt()] + ) -> + start_mon_ret(). +%% +start_monitor(ServerName, Module, Args, Options) -> + gen:start(?MODULE, monitor, ServerName, Module, Args, Options). -start_link(Name, Mod, Args, Options) -> - gen:start(?MODULE, link, Name, Mod, Args, Options). %% ----------------------------------------------------------------- %% Stop a generic server and wait for it to terminate. %% If the server is located at another node, that node will %% be monitored. %% ----------------------------------------------------------------- -stop(Name) -> - gen:stop(Name). -stop(Name, Reason, Timeout) -> - gen:stop(Name, Reason, Timeout). +-spec stop( + ServerRef :: server_ref() + ) -> ok. +%% +stop(ServerRef) -> + gen:stop(ServerRef). + +-spec stop( + ServerRef :: server_ref(), + Reason :: term(), + Timeout :: timeout() + ) -> ok. +%% +stop(ServerRef, Reason, Timeout) -> + gen:stop(ServerRef, Reason, Timeout). %% ----------------------------------------------------------------- %% Make a call to a generic server. @@ -329,53 +481,276 @@ stop(Name, Reason, Timeout) -> %% be monitored. %% If the client is trapping exits and is linked server termination %% is handled here (? Shall we do that here (or rely on timeouts) ?). +%% ----------------------------------------------------------------- + +-spec call( + ServerRef :: server_ref(), + Request :: term() + ) -> + Reply :: term(). +%% +call(ServerRef, Request) -> + case catch gen:call(ServerRef, '$gen_call', Request) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [ServerRef, Request]}}) + end. + +-spec call( + ServerRef :: server_ref(), + Request :: term(), + Timeout :: timeout() + ) -> + Reply :: term(). +%% +call(ServerRef, Request, Timeout) -> + case catch gen:call(ServerRef, '$gen_call', Request, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [ServerRef, Request, Timeout]}}) + end. + %% ----------------------------------------------------------------- -call(Name, Request) -> - case catch gen:call(Name, '$gen_call', Request) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request]}}) +%% Send a request to a generic server and return a Key which should be +%% used with wait_response/2 or check_response/2 to fetch the +%% result of the request. + +-spec send_request(ServerRef::server_ref(), Request::term()) -> + ReqId::request_id(). + +send_request(ServerRef, Request) -> + try + gen:send_request(ServerRef, '$gen_call', Request) + catch + error:badarg -> + error(badarg, [ServerRef, Request]) + end. + +-spec send_request(ServerRef::server_ref(), + Request::term(), + Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). + +send_request(ServerRef, Request, Label, ReqIdCol) -> + try + gen:send_request(ServerRef, '$gen_call', Request, Label, ReqIdCol) + catch + error:badarg -> + error(badarg, [ServerRef, Request, Label, ReqIdCol]) + end. + +-spec wait_response(ReqId, WaitTime) -> Result when + ReqId :: request_id(), + WaitTime :: response_timeout(), + Response :: {reply, Reply::term()} + | {error, {Reason::term(), server_ref()}}, + Result :: Response | 'timeout'. + +wait_response(ReqId, WaitTime) -> + try + gen:wait_response(ReqId, WaitTime) + catch + error:badarg -> + error(badarg, [ReqId, WaitTime]) + end. + +-spec wait_response(ReqIdCollection, WaitTime, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + WaitTime :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), server_ref()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +wait_response(ReqIdCol, WaitTime, Delete) -> + try + gen:wait_response(ReqIdCol, WaitTime, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, WaitTime, Delete]) + end. + +-spec receive_response(ReqId, Timeout) -> Result when + ReqId :: request_id(), + Timeout :: response_timeout(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), server_ref()}}, + Result :: Response | 'timeout'. + +receive_response(ReqId, Timeout) -> + try + gen:receive_response(ReqId, Timeout) + catch + error:badarg -> + error(badarg, [ReqId, Timeout]) end. -call(Name, Request, Timeout) -> - case catch gen:call(Name, '$gen_call', Request, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) +-spec receive_response(ReqIdCollection, Timeout, Delete) -> Result when + ReqIdCollection :: request_id_collection(), + Timeout :: response_timeout(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), server_ref()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'timeout'. + +receive_response(ReqIdCol, Timeout, Delete) -> + try + gen:receive_response(ReqIdCol, Timeout, Delete) + catch + error:badarg -> + error(badarg, [ReqIdCol, Timeout, Delete]) + end. + +-spec check_response(Msg, ReqId) -> Result when + Msg :: term(), + ReqId :: request_id(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), server_ref()}}, + Result :: Response | 'no_reply'. + +check_response(Msg, ReqId) -> + try + gen:check_response(Msg, ReqId) + catch + error:badarg -> + error(badarg, [Msg, ReqId]) + end. + +-spec check_response(Msg, ReqIdCollection, Delete) -> Result when + Msg :: term(), + ReqIdCollection :: request_id_collection(), + Delete :: boolean(), + Response :: {reply, Reply::term()} | + {error, {Reason::term(), server_ref()}}, + Result :: {Response, + Label::term(), + NewReqIdCollection::request_id_collection()} | + 'no_request' | + 'no_reply'. + +check_response(Msg, ReqIdCol, Delete) -> + try + gen:check_response(Msg, ReqIdCol, Delete) + catch + error:badarg -> + error(badarg, [Msg, ReqIdCol, Delete]) + end. + +-spec reqids_new() -> + NewReqIdCollection::request_id_collection(). + +reqids_new() -> + gen:reqids_new(). + +-spec reqids_size(ReqIdCollection::request_id_collection()) -> + non_neg_integer(). + +reqids_size(ReqIdCollection) -> + try + gen:reqids_size(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) + end. + +-spec reqids_add(ReqId::request_id(), Label::term(), + ReqIdCollection::request_id_collection()) -> + NewReqIdCollection::request_id_collection(). + +reqids_add(ReqId, Label, ReqIdCollection) -> + try + gen:reqids_add(ReqId, Label, ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqId, Label, ReqIdCollection]) + end. + +-spec reqids_to_list(ReqIdCollection::request_id_collection()) -> + [{ReqId::request_id(), Label::term()}]. + +reqids_to_list(ReqIdCollection) -> + try + gen:reqids_to_list(ReqIdCollection) + catch + error:badarg -> error(badarg, [ReqIdCollection]) end. %% ----------------------------------------------------------------- %% Make a cast to a generic server. %% ----------------------------------------------------------------- + +-spec cast( + ServerRef :: server_ref(), + Request :: term() + ) -> + ok. +%% cast({global,Name}, Request) -> - catch global:send(Name, {'$gen_cast', Request}), + catch global:send(Name, cast_msg(Request)), ok; -cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> - catch (Dest ! {'$gen_cast', Request}), +cast({via, Mod, Name}, Request) -> + catch Mod:send(Name, cast_msg(Request)), ok; -cast(Dest, Request) when is_atom(Dest); is_pid(Dest) -> - catch (Dest ! {'$gen_cast', Request}), +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_atom(Dest) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_pid(Dest) -> + do_cast(Dest, Request). + +do_cast(Dest, Request) -> + do_send(Dest, cast_msg(Request)), ok. + +cast_msg(Request) -> {'$gen_cast',Request}. %% ----------------------------------------------------------------- %% Send a reply to the client. %% ----------------------------------------------------------------- -reply({To, Tag}, Reply) -> - catch To ! {Tag, Reply}. -%% ----------------------------------------------------------------- -%% Asynchronous broadcast, returns nothing, it's just send'n pray -%% ----------------------------------------------------------------- +-spec reply( + Client :: from(), + Reply :: term() + ) -> + ok. +%% +reply(Client, Reply) -> + gen:reply(Client, Reply). + +%% ----------------------------------------------------------------- +%% Asynchronous broadcast, returns nothing, it's just send 'n' pray +%%----------------------------------------------------------------- + +-spec abcast( + Name :: atom(), + Request :: term() + ) -> + abcast. +%% abcast(Name, Request) when is_atom(Name) -> - do_abcast([node() | nodes()], Name, {'$gen_cast', Request}). - + do_abcast([node() | nodes()], Name, cast_msg(Request)). + +-spec abcast( + Nodes :: [node()], + Name :: atom(), + Request :: term() + ) -> + abcast. +%% abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> - do_abcast(Nodes, Name, {'$gen_cast', Request}). + do_abcast(Nodes, Name, cast_msg(Request)). do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) -> - catch ({Name, Node} ! Msg), + do_send({Name,Node},Msg), do_abcast(Nodes, Name, Msg); do_abcast([], _,_) -> abcast. @@ -383,158 +758,167 @@ do_abcast([], _,_) -> abcast. %%% Make a call to servers at several nodes. %%% Returns: {[Replies],[BadNodes]} %%% A Timeout can be given -%%% +%%% %%% A middleman process is used in case late answers arrives after %%% the timeout. If they would be allowed to glog the callers message -%%% queue, it would probably become confused. Late answers will +%%% queue, it would probably become confused. Late answers will %%% now arrive to the terminated middleman and so be discarded. %%% ----------------------------------------------------------------- -multi_call(Name, Req) - when is_atom(Name) -> - do_multi_call([node() | nodes()], Name, Req, infinity). -multi_call(Nodes, Name, Req) +-spec multi_call( + Name :: atom(), + Request :: term() + ) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Name, Request) + when is_atom(Name) -> + do_multi_call([node() | nodes()], Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term() + ) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> - do_multi_call(Nodes, Name, Req, infinity). - -multi_call(Nodes, Name, Req, infinity) -> - do_multi_call(Nodes, Name, Req, infinity); -multi_call(Nodes, Name, Req, Timeout) - when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> - do_multi_call(Nodes, Name, Req, Timeout). - -%%% ----------------------------------------------------------------- -%%% Make multiple calls to multiple servers, given pairs of servers -%%% and messages. -%%% Returns: {[{Dest, Reply}], [{Dest, Error}]} -%%% -%%% Dest can be pid() | RegName :: atom() | -%%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()} -%%% -%%% A middleman process is used to avoid clogging up the callers -%%% message queue. -%%% ----------------------------------------------------------------- -mcall(CallSpecs) -> - Tag = make_ref(), - {_, MRef} = spawn_monitor( - fun() -> - Refs = lists:foldl( - fun ({Dest, _Request}=S, Dict) -> - dict:store(do_mcall(S), Dest, Dict) - end, dict:new(), CallSpecs), - collect_replies(Tag, Refs, [], []) - end), - receive - {'DOWN', MRef, _, _, {Tag, Result}} -> Result; - {'DOWN', MRef, _, _, Reason} -> exit(Reason) - end. - -do_mcall({{global,Name}=Dest, Request}) -> - %% whereis_name is simply an ets lookup, and is precisely what - %% global:send/2 does, yet we need a Ref to put in the call to the - %% server, so invoking whereis_name makes a lot more sense here. - case global:whereis_name(Name) of - Pid when is_pid(Pid) -> - MRef = erlang:monitor(process, Pid), - catch msend(Pid, MRef, Request), - MRef; - undefined -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, Dest, noproc}, - Ref - end; -do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) -> - {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6 - catch msend(Dest, MRef, Request), - MRef; -do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) -> - MRef = erlang:monitor(process, Dest), - catch msend(Dest, MRef, Request), - MRef. - -msend(Dest, MRef, Request) -> - erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]). - -collect_replies(Tag, Refs, Replies, Errors) -> - case dict:size(Refs) of - 0 -> exit({Tag, {Replies, Errors}}); - _ -> receive - {MRef, Reply} -> - {Refs1, Replies1} = handle_call_result(MRef, Reply, - Refs, Replies), - collect_replies(Tag, Refs1, Replies1, Errors); - {'DOWN', MRef, _, _, Reason} -> - Reason1 = case Reason of - noconnection -> nodedown; - _ -> Reason - end, - {Refs1, Errors1} = handle_call_result(MRef, Reason1, - Refs, Errors), - collect_replies(Tag, Refs1, Replies, Errors1) - end - end. - -handle_call_result(MRef, Result, Refs, AccList) -> - %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2} - %% here, so we must cope with MRefs that we've already seen and erased - case dict:find(MRef, Refs) of - {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}; - _ -> {Refs, AccList} - end. + do_multi_call(Nodes, Name, Request, infinity). + +-spec multi_call( + Nodes :: [node()], + Name :: atom(), + Request :: term(), + Timeout :: timeout() + ) -> + {Replies :: + [{Node :: node(), Reply :: term()}], + BadNodes :: [node()] + }. +%% +multi_call(Nodes, Name, Request, Timeout) + when is_list(Nodes), is_atom(Name), ?is_timeout(Timeout) -> + do_multi_call(Nodes, Name, Request, Timeout). -%% ----------------------------------------------------------------- -%% Apply a function to a generic server's state. -%% ----------------------------------------------------------------- -with_state(Name, Fun) -> - case catch gen:call(Name, '$with_state', Fun, infinity) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, with_state, [Name, Fun]}}) - end. %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ -%% -%% Description: Makes an existing process into a gen_server. -%% The calling process will enter the gen_server receive +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive %% loop and become a gen_server process. -%% The process *must* have been started using one of the -%% start functions in proc_lib, see proc_lib(3). -%% The user is responsible for any initialization of the +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the %% process, including registering a name for it. %%----------------------------------------------------------------- -enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity, undefined). - -enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> - enter_loop(Mod, Options, State, self(), infinity, Backoff); - -enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity, undefined); - -enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout, undefined). - -enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity, Backoff); - -enter_loop(Mod, Options, State, ServerName, Timeout) -> - enter_loop(Mod, Options, State, ServerName, Timeout, undefined). - -enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> - Name = get_proc_name(ServerName), - Parent = get_parent(), - Debug = debug_options(Name, Options), - Queue = priority_queue:new(), - Backoff1 = extend_backoff(Backoff), - {EmitStatsFun, StopStatsFun} = stats_funs(), - loop(init_stats(find_prioritisers( - #gs2_state { parent = Parent, name = Name, state = State, - mod = Mod, time = Timeout, timeout_state = Backoff1, - queue = Queue, debug = Debug, - emit_stats_fun = EmitStatsFun, - stop_stats_fun = StopStatsFun }))). + +-spec enter_loop( + Module :: module(), + Options :: [enter_loop_opt()], + State :: term() + ) -> + no_return(). +%% +enter_loop(Mod, Options, State) + when is_atom(Mod), is_list(Options) -> + enter_loop(Mod, Options, State, self(), infinity). + +-spec enter_loop( + Module :: module(), + Options :: [enter_loop_opt()], + State :: term(), + ServerName :: server_name() | pid() + ) -> + no_return(); + ( + Module :: module(), + Options :: [enter_loop_opt()], + State :: term(), + Timeout :: timeout() + ) -> + no_return(); + ( + Module :: module(), + Options :: [enter_loop_opt()], + State :: term(), + Hibernate :: 'hibernate' + ) -> + no_return(); + ( + Module :: module(), + Options :: [enter_loop_opt()], + State :: term(), + Cont :: {'continue', term()} + ) -> + no_return(). +%% +enter_loop(Mod, Options, State, ServerName = {Scope, _}) + when is_atom(Mod), is_list(Options), Scope == local; + is_atom(Mod), is_list(Options), Scope == global -> + enter_loop(Mod, Options, State, ServerName, infinity); +%% +enter_loop(Mod, Options, State, ServerName = {via, _, _}) + when is_atom(Mod), is_list(Options) -> + enter_loop(Mod, Options, State, ServerName, infinity); +%% +enter_loop(Mod, Options, State, TimeoutOrHibernate) + when is_atom(Mod), is_list(Options), ?is_timeout(TimeoutOrHibernate); + is_atom(Mod), is_list(Options), TimeoutOrHibernate =:= hibernate -> + enter_loop(Mod, Options, State, self(), TimeoutOrHibernate); +%% +enter_loop(Mod, Options, State, {continue, _}=Continue) + when is_atom(Mod), is_list(Options) -> + enter_loop(Mod, Options, State, self(), Continue). + +-spec enter_loop( + Module :: module(), + Options :: [enter_loop_opt()], + State :: term(), + ServerName :: server_name() | pid(), + Timeout :: timeout() + ) -> + no_return(); + ( + Module :: module(), + Options :: [enter_loop_opt()], + State :: term(), + ServerName :: server_name() | pid(), + Hibernate :: 'hibernate' + ) -> + no_return(); + ( + Module :: module(), + Options :: [enter_loop_opt()], + State :: term(), + ServerName :: server_name() | pid(), + Cont :: {'continue', term()} + ) -> + no_return(). +%% +enter_loop(Mod, Options, State, ServerName, TimeoutOrHibernate) + when is_atom(Mod), is_list(Options), ?is_timeout(TimeoutOrHibernate); + is_atom(Mod), is_list(Options), TimeoutOrHibernate =:= hibernate -> + Name = gen:get_proc_name(ServerName), + Parent = gen:get_parent(), + Debug = gen:debug_options(Name, Options), + HibernateAfterTimeout = gen:hibernate_after(Options), + loop(Parent, Name, State, Mod, TimeoutOrHibernate, HibernateAfterTimeout, Debug); +%% +enter_loop(Mod, Options, State, ServerName, {continue, _}=Continue) + when is_atom(Mod), is_list(Options) -> + Name = gen:get_proc_name(ServerName), + Parent = gen:get_parent(), + Debug = gen:debug_options(Name, Options), + HibernateAfterTimeout = gen:hibernate_after(Options), + loop(Parent, Name, State, Mod, Continue, HibernateAfterTimeout, Debug). %%%======================================================================== %%% Gen-callback functions @@ -550,89 +934,52 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); init_it(Starter, Parent, Name0, Mod, Args, Options) -> - Name = name(Name0), - Debug = debug_options(Name, Options), - Queue = priority_queue:new(), - {EmitStatsFun, StopStatsFun} = stats_funs(), - GS2State = find_prioritisers( - #gs2_state { parent = Parent, - name = Name, - mod = Mod, - queue = Queue, - debug = Debug, - emit_stats_fun = EmitStatsFun, - stop_stats_fun = StopStatsFun }), - case catch Mod:init(Args) of - {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(init_stats(GS2State#gs2_state { state = State, - time = infinity, - timeout_state = undefined })); - {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(init_stats( - GS2State#gs2_state { state = State, - time = Timeout, - timeout_state = undefined })); - {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> - Backoff1 = extend_backoff(Backoff), - proc_lib:init_ack(Starter, {ok, self()}), - loop(init_stats(GS2State#gs2_state { state = State, - time = Timeout, - timeout_state = Backoff1 })); - {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} -> - Backoff1 = extend_backoff(Backoff), - proc_lib:init_ack(Starter, {ok, self()}), - loop(init_stats(find_prioritisers( - GS2State#gs2_state { mod = Mod1, - state = State, - time = Timeout, - timeout_state = Backoff1 }))); - {stop, Reason} -> - %% For consistency, we must make sure that the - %% registered name (if any) is unregistered before - %% the parent process is notified about the failure. - %% (Otherwise, the parent process could get - %% an 'already_started' error if it immediately - %% tried starting the process again.) - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - ignore -> - unregister_name(Name0), - proc_lib:init_ack(Starter, ignore), - exit(normal); - {'EXIT', Reason} -> - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - Else -> - Error = {bad_return_value, Else}, - proc_lib:init_ack(Starter, {error, Error}), - exit(Error) + Name = gen:name(Name0), + Debug = gen:debug_options(Name, Options), + HibernateAfterTimeout = gen:hibernate_after(Options), + + case init_it(Mod, Args) of + {ok, {ok, State}} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug); + {ok, {ok, State, TimeoutOrHibernate}} + when ?is_timeout(TimeoutOrHibernate); + TimeoutOrHibernate =:= hibernate -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, TimeoutOrHibernate, HibernateAfterTimeout, Debug); + {ok, {ok, State, {continue, _}=Continue}} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Continue, HibernateAfterTimeout, Debug); + {ok, {stop, Reason}} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + gen:unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + {ok, ignore} -> + gen:unregister_name(Name0), + proc_lib:init_ack(Starter, ignore), + exit(normal); + {ok, Else} -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error); + {'EXIT', Class, Reason, Stacktrace} -> + gen:unregister_name(Name0), + proc_lib:init_ack(Starter, {error, terminate_reason(Class, Reason, Stacktrace)}), + erlang:raise(Class, Reason, Stacktrace) + end. +init_it(Mod, Args) -> + try + {ok, Mod:init(Args)} + catch + throw:R -> {ok, R}; + Class:R:S -> {'EXIT', Class, R, S} end. - -name({local,Name}) -> Name; -name({global,Name}) -> Name; -%% name(Pid) when is_pid(Pid) -> Pid; -%% when R12 goes away, drop the line beneath and uncomment the line above -name(Name) -> Name. - -unregister_name({local,Name}) -> - _ = (catch unregister(Name)); -unregister_name({global,Name}) -> - _ = global:unregister_name(Name); -unregister_name(Pid) when is_pid(Pid) -> - Pid; -%% Under R12 let's just ignore it, as we have a single term as Name. -%% On R13 it will never get here, as we get tuple with 'local/global' atom. -unregister_name(_Name) -> ok. - -extend_backoff(undefined) -> - undefined; -extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> - {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, - rand:seed(exsplus)}. %%%======================================================================== %%% Internal functions @@ -640,200 +987,80 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(GS2State = #gs2_state { time = hibernate, - timeout_state = undefined, - queue = Queue }) -> - case priority_queue:is_empty(Queue) of - true -> - pre_hibernate(GS2State); - false -> - process_next_msg(GS2State) - end; - -loop(GS2State) -> - process_next_msg(drain(GS2State)). - -drain(GS2State) -> - receive - Input -> drain(in(Input, GS2State)) - after 0 -> GS2State - end. - -process_next_msg(GS2State0 = #gs2_state { time = Time, - timeout_state = TimeoutState, - queue = Queue }) -> - case priority_queue:out(Queue) of - {{value, Msg}, Queue1} -> - GS2State = ensure_stats_timer(GS2State0), - process_msg(Msg, GS2State#gs2_state { queue = Queue1 }); - {empty, Queue1} -> - {Time1, HibOnTimeout, GS2State} - = case {Time, TimeoutState} of - {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> - {Current, true, stop_stats_timer(GS2State0)}; - {hibernate, _} -> - %% wake_hib/7 will set Time to hibernate. If - %% we were woken and didn't receive a msg - %% then we will get here and need a sensible - %% value for Time1, otherwise we crash. - %% R13B1 always waits infinitely when waking - %% from hibernation, so that's what we do - %% here too. - {infinity, false, GS2State0}; - _ -> {Time, false, GS2State0} - end, - receive - Input -> - %% Time could be 'hibernate' here, so *don't* call loop - process_next_msg( - drain(in(Input, GS2State #gs2_state { queue = Queue1 }))) - after Time1 -> - case HibOnTimeout of - true -> - pre_hibernate( - GS2State #gs2_state { queue = Queue1 }); - false -> - process_msg(timeout, - GS2State #gs2_state { queue = Queue1 }) - end - end - end. -wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> - TimeoutState1 = case TS of - undefined -> - undefined; - {SleptAt, TimeoutState} -> - adjust_timeout_state(SleptAt, - erlang:monotonic_time(), - TimeoutState) - end, - post_hibernate( - drain(GS2State #gs2_state { timeout_state = TimeoutState1 })). - -hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> - TS = case TimeoutState of - undefined -> undefined; - {backoff, _, _, _, _} -> {erlang:monotonic_time(), - TimeoutState} - end, - proc_lib:hibernate(?MODULE, wake_hib, - [GS2State #gs2_state { timeout_state = TS }]). - -pre_hibernate(GS2State0 = #gs2_state { state = State, - mod = Mod, - emit_stats_fun = EmitStatsFun }) -> - GS2State = EmitStatsFun(stop_stats_timer(GS2State0)), - case erlang:function_exported(Mod, handle_pre_hibernate, 1) of - true -> - case catch Mod:handle_pre_hibernate(State) of - {hibernate, NState} -> - hibernate(GS2State #gs2_state { state = NState } ); - Reply -> - handle_common_termination(Reply, pre_hibernate, GS2State) - end; - false -> - hibernate(GS2State) - end. +loop(Parent, Name, State, Mod, {continue, Continue} = Msg, HibernateAfterTimeout, Debug) -> + Reply = try_dispatch(Mod, handle_continue, Continue, State), + case Debug of + [] -> + handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod, + HibernateAfterTimeout, State); + _ -> + Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, Msg), + handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod, + HibernateAfterTimeout, State, Debug1) + end; -post_hibernate(GS2State0 = #gs2_state { state = State, - mod = Mod }) -> - GS2State = ensure_stats_timer(GS2State0), - case erlang:function_exported(Mod, handle_post_hibernate, 1) of - true -> - case catch Mod:handle_post_hibernate(State) of - {noreply, NState} -> - process_next_msg(GS2State #gs2_state { state = NState, - time = infinity }); - {noreply, NState, Time} -> - process_next_msg(GS2State #gs2_state { state = NState, - time = Time }); - Reply -> - handle_common_termination(Reply, post_hibernate, GS2State) - end; - false -> - %% use hibernate here, not infinity. This matches - %% R13B. The key is that we should be able to get through - %% to process_msg calling sys:handle_system_msg with Time - %% still set to hibernate, iff that msg is the very msg - %% that woke us up (or the first msg we receive after - %% waking up). - process_next_msg(GS2State #gs2_state { time = hibernate }) +loop(Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug) -> + proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, HibernateAfterTimeout, Debug]); + +loop(Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug) -> + receive + Msg -> + decode_msg(Msg, Parent, Name, State, Mod, infinity, HibernateAfterTimeout, Debug, false) + after HibernateAfterTimeout -> + loop(Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug) + end; + +loop(Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug) -> + Msg = receive + Input -> + Input + after Time -> + timeout + end, + decode_msg(Msg, Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug, false). + +wake_hib(Parent, Name, State, Mod, HibernateAfterTimeout, Debug) -> + Msg = receive + Input -> + Input + end, + decode_msg(Msg, Parent, Name, State, Mod, hibernate, HibernateAfterTimeout, Debug, true). + +decode_msg(Msg, Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug, Hib) -> + case Msg of + {system, From, Req} -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, HibernateAfterTimeout], Hib); + {'EXIT', Parent, Reason} -> + terminate(Reason, ?STACKTRACE(), Name, undefined, Msg, Mod, State, Debug); + _Msg when Debug =:= [] -> + handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout); + _Msg -> + Debug1 = sys:handle_debug(Debug, fun print_event/3, + Name, {in, Msg}), + handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout, Debug1) end. -adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, - DesiredHibPeriod, RandomState}) -> - NapLengthMicros = erlang:convert_time_unit(AwokeAt - SleptAt, - native, micro_seconds), - CurrentMicros = CurrentTO * 1000, - MinimumMicros = MinimumTO * 1000, - DesiredHibMicros = DesiredHibPeriod * 1000, - GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, - Base = - %% If enough time has passed between the last two messages then we - %% should consider sleeping sooner. Otherwise stay awake longer. - case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of - true -> lists:max([MinimumTO, CurrentTO div 2]); - false -> CurrentTO - end, - {Extra, RandomState1} = rand:uniform_s(Base, RandomState), - CurrentTO1 = Base + Extra, - {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. - -in({'$gen_cast', Msg} = Input, - GS2State = #gs2_state { prioritisers = {_, F, _} }) -> - in(Input, F(Msg, GS2State), GS2State); -in({'$gen_call', From, Msg} = Input, - GS2State = #gs2_state { prioritisers = {F, _, _} }) -> - in(Input, F(Msg, From, GS2State), GS2State); -in({'$with_state', _From, _Fun} = Input, GS2State) -> - in(Input, 0, GS2State); -in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) -> - in(Input, infinity, GS2State); -in({system, _From, _Req} = Input, GS2State) -> - in(Input, infinity, GS2State); -in(emit_gen_server2_stats, GS2State = #gs2_state{ emit_stats_fun = EmitStatsFun}) -> - next_stats_timer(EmitStatsFun(GS2State)); -in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) -> - in(Input, F(Input, GS2State), GS2State). - -in(_Input, drop, GS2State) -> - GS2State; - -in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> - GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }. - -process_msg({system, From, Req}, - GS2State = #gs2_state { parent = Parent, debug = Debug }) -> - case Req of - %% This clause will match only in R16B03. - %% Since 17.0 replace_state is not a system message. - {replace_state, StateFun} -> - GS2State1 = StateFun(GS2State), - _ = gen:reply(From, GS2State1), - system_continue(Parent, Debug, GS2State1); - _ -> - %% gen_server puts Hib on the end as the 7th arg, but that version - %% of the fun seems not to be documented so leaving out for now. - sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State) - end; -process_msg({'$with_state', From, Fun}, - GS2State = #gs2_state{state = State}) -> - reply(From, catch Fun(State)), - loop(GS2State); -process_msg({'EXIT', Parent, Reason} = Msg, - GS2State = #gs2_state { parent = Parent }) -> - terminate(Reason, Msg, GS2State); -process_msg(Msg, GS2State = #gs2_state { debug = [] }) -> - handle_msg(Msg, GS2State); -process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) -> - Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}), - handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }). - %%% --------------------------------------------------- -%%% Send/recive functions +%%% Send/receive functions %%% --------------------------------------------------- +do_send(Dest, Msg) -> + try erlang:send(Dest, Msg) + catch + error:_ -> ok + end, + ok. +do_multi_call([Node], Name, Req, infinity) when Node =:= node() -> + % Special case when multi_call is used with local node only. + % In that case we can leverage the benefit of recv_mark optimisation + % existing in simple gen:call. + try gen:call(Name, '$gen_call', Req, infinity) of + {ok, Res} -> {[{Node, Res}],[]} + catch exit:_ -> + {[], [Node]} + end; do_multi_call(Nodes, Name, Req, infinity) -> Tag = make_ref(), Monitors = send_nodes(Nodes, Name, Tag, Req), @@ -842,35 +1069,35 @@ do_multi_call(Nodes, Name, Req, Timeout) -> Tag = make_ref(), Caller = self(), Receiver = - spawn( - fun () -> - %% Middleman process. Should be unsensitive to regular - %% exit signals. The synchronization is needed in case - %% the receiver would exit before the caller started - %% the monitor. - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller,Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Req), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(),Tag,Result}); - {'DOWN',Mref,_,_,_} -> - %% Caller died before sending us the go-ahead. - %% Give up silently. - exit(normal) - end - end), + spawn( + fun() -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), Mref = erlang:monitor(process, Receiver), Receiver ! {self(),Tag}, receive - {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> - Result; - {'DOWN',Mref,_,_,Reason} -> - %% The middleman code failed. Or someone did - %% exit(_, kill) on the middleman process => Reason==killed - exit(Reason) + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) end. send_nodes(Nodes, Name, Tag, Req) -> @@ -885,7 +1112,7 @@ send_nodes([Node|Tail], Name, Tag, Req, Monitors) send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> %% Skip non-atom Node send_nodes(Tail, Name, Tag, Req, Monitors); -send_nodes([], _Name, _Tag, _Req, Monitors) -> +send_nodes([], _Name, _Tag, _Req, Monitors) -> Monitors. %% Against old nodes: @@ -895,89 +1122,89 @@ send_nodes([], _Name, _Tag, _Req, Monitors) -> %% Against contemporary nodes: %% Wait for reply, server 'DOWN', or timeout from TimerId. -rec_nodes(Tag, Nodes, Name, TimerId) -> +rec_nodes(Tag, Nodes, Name, TimerId) -> rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], Time, TimerId); - {timeout, TimerId, _} -> - unmonitor(R), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + erlang:demonitor(R, [flush]), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + erlang:demonitor(R, [flush]), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) after Time -> - case rpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N|Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], - Replies, 2000, TimerId) - end + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end end; rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok end, {Replies, Badnodes}. %% Collect all replies that already have arrived rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + erlang:demonitor(R, [flush]), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + erlang:demonitor(R, [flush]), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> {Replies, Badnodes}. @@ -989,431 +1216,614 @@ rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end end. -%% Cancels a monitor started with Ref=erlang:monitor(_, _). -unmonitor(Ref) when is_reference(Ref) -> - erlang:demonitor(Ref), - receive - {'DOWN', Ref, _, _, _} -> - true - after 0 -> - true +%% --------------------------------------------------- +%% Helper functions for try-catch of callbacks. +%% Returns the return value of the callback, or +%% {'EXIT', Class, Reason, Stack} (if an exception occurs) +%% +%% The Class, Reason and Stack are given to erlang:raise/3 +%% to make sure proc_lib receives the proper reasons and +%% stacktraces. +%% --------------------------------------------------- + +try_dispatch({'$gen_cast', Msg}, Mod, State) -> + try_dispatch(Mod, handle_cast, Msg, State); +try_dispatch(Info, Mod, State) -> + try_dispatch(Mod, handle_info, Info, State). + +try_dispatch(Mod, Func, Msg, State) -> + try + {ok, Mod:Func(Msg, State)} + catch + throw:R -> + {ok, R}; + error:undef = R:Stacktrace when Func == handle_info -> + case erlang:function_exported(Mod, handle_info, 2) of + false -> + ?LOG_WARNING( + #{label=>{gen_server,no_handle_info}, + module=>Mod, + message=>Msg}, + #{domain=>[otp], + report_cb=>fun gen_server:format_log/2, + error_logger=> + #{tag=>warning_msg, + report_cb=>fun gen_server:format_log/1}}), + {ok, {noreply, State}}; + true -> + {'EXIT', error, R, Stacktrace} + end; + Class:R:Stacktrace -> + {'EXIT', Class, R, Stacktrace} end. +try_handle_call(Mod, Msg, From, State) -> + try + {ok, Mod:handle_call(Msg, From, State)} + catch + throw:R -> + {ok, R}; + Class:R:Stacktrace -> + {'EXIT', Class, R, Stacktrace} + end. + +try_terminate(Mod, Reason, State) -> + case erlang:function_exported(Mod, terminate, 2) of + true -> + try + {ok, Mod:terminate(Reason, State)} + catch + throw:R -> + {ok, R}; + Class:R:Stacktrace -> + {'EXIT', Class, R, Stacktrace} + end; + false -> + {ok, ok} + end. + + %%% --------------------------------------------------- %%% Message handling functions %%% --------------------------------------------------- -dispatch({'$gen_cast', Msg}, Mod, State) -> - Mod:handle_cast(Msg, State); -dispatch(Info, Mod, State) -> - Mod:handle_info(Info, State). - -common_reply(_Name, From, Reply, _NState, [] = _Debug) -> - reply(From, Reply), - []; -common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) -> - reply(From, Reply), - sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}). - -common_noreply(_Name, _NState, [] = _Debug) -> - []; -common_noreply(Name, NState, Debug) -> - sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}). - -common_become(_Name, _Mod, _NState, [] = _Debug) -> - []; -common_become(Name, Mod, NState, Debug) -> - sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}). - -handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, - state = State, - name = Name, - debug = Debug }) -> - case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - Debug1 = common_reply(Name, From, Reply, NState, Debug), - loop(GS2State #gs2_state { state = NState, - time = infinity, - debug = Debug1 }); - {reply, Reply, NState, Time1} -> - Debug1 = common_reply(Name, From, Reply, NState, Debug), - loop(GS2State #gs2_state { state = NState, - time = Time1, - debug = Debug1}); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Msg, - GS2State #gs2_state { state = NState })), - _ = common_reply(Name, From, Reply, NState, Debug), - exit(R); - Other -> - handle_common_reply(Other, Msg, GS2State) +handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod, HibernateAfterTimeout) -> + Result = try_handle_call(Mod, Msg, From, State), + case Result of + {ok, {reply, Reply, NState}} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, infinity, HibernateAfterTimeout, []); + {ok, {reply, Reply, NState, TimeoutOrHibernate}} + when ?is_timeout(TimeoutOrHibernate); + TimeoutOrHibernate =:= hibernate -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, TimeoutOrHibernate, HibernateAfterTimeout, []); + {ok, {reply, Reply, NState, {continue, _}=Continue}} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, Continue, HibernateAfterTimeout, []); + {ok, {stop, Reason, Reply, NState}} -> + try + terminate(Reason, ?STACKTRACE(), Name, From, Msg, Mod, NState, []) + after + reply(From, Reply) + end; + Other -> handle_common_reply(Other, Parent, Name, From, Msg, Mod, HibernateAfterTimeout, State) + end; +handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout) -> + Reply = try_dispatch(Msg, Mod, State), + handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod, HibernateAfterTimeout, State). + +handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod, HibernateAfterTimeout, Debug) -> + Result = try_handle_call(Mod, Msg, From, State), + case Result of + {ok, {reply, Reply, NState}} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, infinity, HibernateAfterTimeout, Debug1); + {ok, {reply, Reply, NState, TimeoutOrHibernate}} + when ?is_timeout(TimeoutOrHibernate); + TimeoutOrHibernate =:= hibernate -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, TimeoutOrHibernate, HibernateAfterTimeout, Debug1); + {ok, {reply, Reply, NState, {continue, _}=Continue}} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, Continue, HibernateAfterTimeout, Debug1); + {ok, {stop, Reason, Reply, NState}} -> + try + terminate(Reason, ?STACKTRACE(), Name, From, Msg, Mod, NState, Debug) + after + _ = reply(Name, From, Reply, NState, Debug) + end; + Other -> + handle_common_reply(Other, Parent, Name, From, Msg, Mod, HibernateAfterTimeout, State, Debug) end; -handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) -> - Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Msg, GS2State). +handle_msg(Msg, Parent, Name, State, Mod, HibernateAfterTimeout, Debug) -> + Reply = try_dispatch(Msg, Mod, State), + handle_common_reply(Reply, Parent, Name, undefined, Msg, Mod, HibernateAfterTimeout, State, Debug). -handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, - debug = Debug}) -> +handle_common_reply(Reply, Parent, Name, From, Msg, Mod, HibernateAfterTimeout, State) -> case Reply of - {noreply, NState} -> - Debug1 = common_noreply(Name, NState, Debug), - loop(GS2State #gs2_state {state = NState, - time = infinity, - debug = Debug1}); - {noreply, NState, Time1} -> - Debug1 = common_noreply(Name, NState, Debug), - loop(GS2State #gs2_state {state = NState, - time = Time1, - debug = Debug1}); - {become, Mod, NState} -> - Debug1 = common_become(Name, Mod, NState, Debug), - loop(find_prioritisers( - GS2State #gs2_state { mod = Mod, - state = NState, - time = infinity, - debug = Debug1 })); - {become, Mod, NState, Time1} -> - Debug1 = common_become(Name, Mod, NState, Debug), - loop(find_prioritisers( - GS2State #gs2_state { mod = Mod, - state = NState, - time = Time1, - debug = Debug1 })); - _ -> - handle_common_termination(Reply, Msg, GS2State) + {ok, {noreply, NState}} -> + loop(Parent, Name, NState, Mod, infinity, HibernateAfterTimeout, []); + {ok, {noreply, NState, TimeoutOrHibernate}} + when ?is_timeout(TimeoutOrHibernate); + TimeoutOrHibernate =:= hibernate -> + loop(Parent, Name, NState, Mod, TimeoutOrHibernate, HibernateAfterTimeout, []); + {ok, {noreply, NState, {continue, _}=Continue}} -> + loop(Parent, Name, NState, Mod, Continue, HibernateAfterTimeout, []); + {ok, {stop, Reason, NState}} -> + terminate(Reason, ?STACKTRACE(), Name, From, Msg, Mod, NState, []); + {'EXIT', Class, Reason, Stacktrace} -> + terminate(Class, Reason, Stacktrace, Name, From, Msg, Mod, State, []); + {ok, BadReply} -> + terminate({bad_return_value, BadReply}, ?STACKTRACE(), Name, From, Msg, Mod, State, []) end. -handle_common_termination(Reply, Msg, GS2State) -> +handle_common_reply(Reply, Parent, Name, From, Msg, Mod, HibernateAfterTimeout, State, Debug) -> case Reply of - {stop, Reason, NState} -> - terminate(Reason, Msg, GS2State #gs2_state { state = NState }); - {'EXIT', What} -> - terminate(What, Msg, GS2State); - _ -> - terminate({bad_return_value, Reply}, Msg, GS2State) + {ok, {noreply, NState}} -> + Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, HibernateAfterTimeout, Debug1); + {ok, {noreply, NState, TimeoutOrHibernate}} + when ?is_timeout(TimeoutOrHibernate); + TimeoutOrHibernate =:= hibernate -> + Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}), + loop(Parent, Name, NState, Mod, TimeoutOrHibernate, HibernateAfterTimeout, Debug1); + {ok, {noreply, NState, {continue, _}=Continue}} -> + Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}), + loop(Parent, Name, NState, Mod, Continue, HibernateAfterTimeout, Debug1); + {ok, {stop, Reason, NState}} -> + terminate(Reason, ?STACKTRACE(), Name, From, Msg, Mod, NState, Debug); + {'EXIT', Class, Reason, Stacktrace} -> + terminate(Class, Reason, Stacktrace, Name, From, Msg, Mod, State, Debug); + {ok, BadReply} -> + terminate({bad_return_value, BadReply}, ?STACKTRACE(), Name, From, Msg, Mod, State, Debug) end. +reply(Name, From, Reply, State, Debug) -> + reply(From, Reply), + sys:handle_debug(Debug, fun print_event/3, Name, + {out, Reply, From, State} ). + + %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, GS2State) -> - loop(GS2State #gs2_state { parent = Parent, debug = Debug }). +system_continue(Parent, Debug, [Name, State, Mod, Time, HibernateAfterTimeout]) -> + loop(Parent, Name, State, Mod, Time, HibernateAfterTimeout, Debug). -system_terminate(Reason, _Parent, Debug, GS2State) -> - terminate(Reason, [], GS2State #gs2_state { debug = Debug }). +-spec system_terminate(_, _, _, [_]) -> no_return(). -system_code_change(GS2State = #gs2_state { mod = Mod, - state = State }, - _Module, OldVsn, Extra) -> +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _HibernateAfterTimeout]) -> + terminate(Reason, ?STACKTRACE(), Name, undefined, [], Mod, State, Debug). + +system_code_change([Name, State, Mod, Time, HibernateAfterTimeout], _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> - NewGS2State = find_prioritisers( - GS2State #gs2_state { state = NewState }), - {ok, [NewGS2State]}; - Else -> - Else + {ok, NewState} -> {ok, [Name, NewState, Mod, Time, HibernateAfterTimeout]}; + Else -> Else end. +system_get_state([_Name, State, _Mod, _Time, _HibernateAfterTimeout]) -> + {ok, State}. + +system_replace_state(StateFun, [Name, State, Mod, Time, HibernateAfterTimeout]) -> + NState = StateFun(State), + {ok, NState, [Name, NState, Mod, Time, HibernateAfterTimeout]}. + %%----------------------------------------------------------------- %% Format debug messages. Print them as the call-back module sees %% them, not as the real erlang messages. Use trace for that. %%----------------------------------------------------------------- print_event(Dev, {in, Msg}, Name) -> case Msg of - {'$gen_call', {From, _Tag}, Call} -> - io:format(Dev, "*DBG* ~tp got call ~tp from ~w~n", - [Name, Call, From]); - {'$gen_cast', Cast} -> - io:format(Dev, "*DBG* ~tp got cast ~tp~n", - [Name, Cast]); - _ -> - io:format(Dev, "*DBG* ~tp got ~tp~n", [Name, Msg]) + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~tp got call ~tp from ~tw~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~tp got cast ~tp~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~tp got ~tp~n", [Name, Msg]) end; -print_event(Dev, {out, Msg, To, State}, Name) -> - io:format(Dev, "*DBG* ~tp sent ~tp to ~w, new state ~w~n", - [Name, Msg, To, State]); +print_event(Dev, {out, Msg, {To,_Tag}, State}, Name) -> + io:format(Dev, "*DBG* ~tp sent ~tp to ~tw, new state ~tp~n", + [Name, Msg, To, State]); print_event(Dev, {noreply, State}, Name) -> - io:format(Dev, "*DBG* ~tp new state ~w~n", [Name, State]); + io:format(Dev, "*DBG* ~tp new state ~tp~n", [Name, State]); print_event(Dev, Event, Name) -> io:format(Dev, "*DBG* ~tp dbg ~tp~n", [Name, Event]). %%% --------------------------------------------------- %%% Terminate the server. +%%% +%%% terminate/8 is triggered by {stop, Reason} or bad +%%% return values. The stacktrace is generated via the +%%% ?STACKTRACE() macro and the ReportReason must not +%%% be wrapped in tuples. +%%% +%%% terminate/9 is triggered in case of error/exit in +%%% the user callback. In this case the report reason +%%% always includes the user stacktrace. +%%% +%%% The reason received in the terminate/2 callbacks +%%% always includes the stacktrace for errors and never +%%% for exits. %%% --------------------------------------------------- --spec terminate(_, _, _) -> no_return(). - -terminate(Reason, Msg, #gs2_state { name = Name, - mod = Mod, - state = State, - debug = Debug, - stop_stats_fun = StopStatsFun - } = GS2State) -> - StopStatsFun(stop_stats_timer(GS2State)), - case catch Mod:terminate(Reason, State) of - {'EXIT', R} -> - error_info(R, Reason, Name, Msg, State, Debug), - exit(R); - _ -> - case Reason of - normal -> - exit(normal); - shutdown -> - exit(shutdown); - {shutdown,_}=Shutdown -> - exit(Shutdown); +-spec terminate(_, _, _, _, _, _, _, _) -> no_return(). +terminate(Reason, Stacktrace, Name, From, Msg, Mod, State, Debug) -> + terminate(exit, Reason, Stacktrace, false, Name, From, Msg, Mod, State, Debug). + +-spec terminate(_, _, _, _, _, _, _, _, _) -> no_return(). +terminate(Class, Reason, Stacktrace, Name, From, Msg, Mod, State, Debug) -> + terminate(Class, Reason, Stacktrace, true, Name, From, Msg, Mod, State, Debug). + +-spec terminate(_, _, _, _, _, _, _, _, _, _) -> no_return(). +terminate(Class, Reason, Stacktrace, ReportStacktrace, Name, From, Msg, Mod, State, Debug) -> + Reply = try_terminate(Mod, terminate_reason(Class, Reason, Stacktrace), State), + case Reply of + {'EXIT', C, R, S} -> + error_info(R, S, Name, From, Msg, Mod, State, Debug), + erlang:raise(C, R, S); + _ -> + case {Class, Reason} of + {exit, normal} -> ok; + {exit, shutdown} -> ok; + {exit, {shutdown,_}} -> ok; + _ when ReportStacktrace -> + error_info(Reason, Stacktrace, Name, From, Msg, Mod, State, Debug); _ -> - error_info(Reason, undefined, Name, Msg, State, Debug), - exit(Reason) - end + error_info(Reason, undefined, Name, From, Msg, Mod, State, Debug) + end + end, + case Stacktrace of + [] -> + erlang:Class(Reason); + _ -> + erlang:raise(Class, Reason, Stacktrace) end. -error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> +terminate_reason(error, Reason, Stacktrace) -> {Reason, Stacktrace}; +terminate_reason(exit, Reason, _Stacktrace) -> Reason. + +error_info(_Reason, _ST, application_controller, _From, _Msg, _Mod, _State, _Debug) -> %% OTP-5811 Don't send an error report if it's the system process %% application_controller which is terminating - let init take care %% of it instead ok; -error_info(Reason, RootCause, Name, Msg, State, Debug) -> - Reason1 = error_reason(Reason), - Fmt = - "** Generic server ~tp terminating~n" - "** Last message in was ~tp~n" - "** When Server state == ~tp~n" - "** Reason for termination == ~n** ~tp~n", - case RootCause of - undefined -> format(Fmt, [Name, Msg, State, Reason1]); - _ -> format(Fmt ++ "** In 'terminate' callback " - "with reason ==~n** ~tp~n", - [Name, Msg, State, Reason1, - error_reason(RootCause)]) - end, - sys:print_log(Debug), +error_info(Reason, ST, Name, From, Msg, Mod, State, Debug) -> + Log = sys:get_log(Debug), + Status = + gen:format_status(Mod, terminate, + #{ reason => Reason, + state => State, + message => Msg, + log => Log }, + [get(),State]), + ReportReason = + if ST == undefined -> + %% When ST is undefined, it should not be included in the + %% reported reason for the crash as it is then caused + %% by an invalid return from a callback and thus thus the + %% stacktrace is irrelevant. + maps:get(reason, Status); + true -> + {maps:get(reason, Status), ST} + end, + + ?LOG_ERROR(#{label=>{gen_server,terminate}, + name=>Name, + last_message=>maps:get(message,Status), + state=>maps:get('EXIT',Status,maps:get('$status',Status,maps:get(state,Status))), + log=>format_log_state(Mod,maps:get(log,Status)), + reason=>ReportReason, + client_info=>client_stacktrace(From)}, + #{domain=>[otp], + report_cb=>fun gen_server:format_log/2, + error_logger=>#{tag=>error, + report_cb=>fun gen_server:format_log/1}}), ok. -error_reason({undef,[{M,F,A}|MFAs]} = Reason) -> - case code:is_loaded(M) of - false -> {'module could not be loaded',[{M,F,A}|MFAs]}; - _ -> case erlang:function_exported(M, F, length(A)) of - true -> Reason; - false -> {'function not exported',[{M,F,A}|MFAs]} - end +client_stacktrace(undefined) -> + undefined; +client_stacktrace({From,_Tag}) -> + client_stacktrace(From); +client_stacktrace(From) when is_pid(From), node(From) =:= node() -> + case process_info(From, [current_stacktrace, registered_name]) of + undefined -> + {From,dead}; + [{current_stacktrace, Stacktrace}, {registered_name, []}] -> + {From,{From,Stacktrace}}; + [{current_stacktrace, Stacktrace}, {registered_name, Name}] -> + {From,{Name,Stacktrace}} end; -error_reason(Reason) -> - Reason. - -%%% --------------------------------------------------- -%%% Misc. functions. -%%% --------------------------------------------------- - -opt(Op, [{Op, Value}|_]) -> - {ok, Value}; -opt(Op, [_|Options]) -> - opt(Op, Options); -opt(_, []) -> - false. - -debug_options(Name, Opts) -> - case opt(debug, Opts) of - {ok, Options} -> dbg_options(Name, Options); - _ -> dbg_options(Name, []) - end. - -dbg_options(Name, []) -> - Opts = - case init:get_argument(generic_debug) of - error -> +client_stacktrace(From) when is_pid(From) -> + {From,remote}. + + +%% format_log/1 is the report callback used by Logger handler +%% error_logger only. It is kept for backwards compatibility with +%% legacy error_logger event handlers. This function must always +%% return {Format,Args} compatible with the arguments in this module's +%% calls to error_logger prior to OTP-21.0. +format_log(Report) -> + Depth = error_logger:get_format_depth(), + FormatOpts = #{chars_limit => unlimited, + depth => Depth, + single_line => false, + encoding => utf8}, + format_log_multi(limit_report(Report,Depth),FormatOpts). + +limit_report(Report,unlimited) -> + Report; +limit_report(#{label:={gen_server,terminate}, + last_message:=Msg, + state:=State, + log:=Log, + reason:=Reason, + client_info:=Client}=Report, + Depth) -> + Report#{last_message=>io_lib:limit_term(Msg,Depth), + state=>io_lib:limit_term(State,Depth), + log=>[io_lib:limit_term(L,Depth)||L<-Log], + reason=>io_lib:limit_term(Reason,Depth), + client_info=>limit_client_report(Client,Depth)}; +limit_report(#{label:={gen_server,no_handle_info}, + message:=Msg}=Report,Depth) -> + Report#{message=>io_lib:limit_term(Msg,Depth)}. + +limit_client_report({From,{Name,Stacktrace}},Depth) -> + {From,{Name,io_lib:limit_term(Stacktrace,Depth)}}; +limit_client_report(Client,_) -> + Client. + +%% format_log/2 is the report callback for any Logger handler, except +%% error_logger. +format_log(Report, FormatOpts0) -> + Default = #{chars_limit => unlimited, + depth => unlimited, + single_line => false, + encoding => utf8}, + FormatOpts = maps:merge(Default,FormatOpts0), + IoOpts = + case FormatOpts of + #{chars_limit:=unlimited} -> []; + #{chars_limit:=Limit} -> + [{chars_limit,Limit}] + end, + {Format,Args} = format_log_single(Report, FormatOpts), + io_lib:format(Format, Args, IoOpts). + +format_log_single(#{label:={gen_server,terminate}, + name:=Name, + last_message:=Msg, + state:=State, + log:=Log, + reason:=Reason, + client_info:=Client}, + #{single_line:=true,depth:=Depth}=FormatOpts) -> + P = p(FormatOpts), + Format1 = lists:append(["Generic server ",P," terminating. Reason: ",P, + ". Last message: ", P, ". State: ",P,"."]), + {ServerLogFormat,ServerLogArgs} = format_server_log_single(Log,FormatOpts), + {ClientLogFormat,ClientLogArgs} = format_client_log_single(Client,FormatOpts), + + Args1 = + case Depth of + unlimited -> + [Name,fix_reason(Reason),Msg,State]; _ -> - [log, statistics] + [Name,Depth,fix_reason(Reason),Depth,Msg,Depth,State,Depth] end, - dbg_opts(Name, Opts); -dbg_options(Name, Opts) -> - dbg_opts(Name, Opts). - -dbg_opts(Name, Opts) -> - case catch sys:debug_options(Opts) of - {'EXIT',_} -> - format("~tp: ignoring erroneous debug options - ~tp~n", - [Name, Opts]), - []; - Dbg -> - Dbg - end. + {Format1++ServerLogFormat++ClientLogFormat, + Args1++ServerLogArgs++ClientLogArgs}; +format_log_single(#{label:={gen_server,no_handle_info}, + module:=Mod, + message:=Msg}, + #{single_line:=true,depth:=Depth}=FormatOpts) -> + P = p(FormatOpts), + Format = lists:append(["Undefined handle_info in ",P, + ". Unhandled message: ",P,"."]), + Args = + case Depth of + unlimited -> + [Mod,Msg]; + _ -> + [Mod,Depth,Msg,Depth] + end, + {Format,Args}; +format_log_single(Report,FormatOpts) -> + format_log_multi(Report,FormatOpts). + +format_log_multi(#{label:={gen_server,terminate}, + name:=Name, + last_message:=Msg, + state:=State, + log:=Log, + reason:=Reason, + client_info:=Client}, + #{depth:=Depth}=FormatOpts) -> + Reason1 = fix_reason(Reason), + {ClientFmt,ClientArgs} = format_client_log(Client,FormatOpts), + P = p(FormatOpts), + Format = + lists:append( + ["** Generic server ",P," terminating \n" + "** Last message in was ",P,"~n" + "** When Server state == ",P,"~n" + "** Reason for termination ==~n** ",P,"~n"] ++ + case Log of + [] -> []; + _ -> ["** Log ==~n** ["| + lists:join(",~n ",lists:duplicate(length(Log),P))]++ + ["]~n"] + end) ++ ClientFmt, + Args = + case Depth of + unlimited -> + [Name, Msg, State, Reason1] ++ Log ++ ClientArgs; + _ -> + [Name, Depth, Msg, Depth, State, Depth, Reason1, Depth] ++ + case Log of + [] -> []; + _ -> lists:flatmap(fun(L) -> [L, Depth] end, Log) + end ++ ClientArgs + end, + {Format,Args}; +format_log_multi(#{label:={gen_server,no_handle_info}, + module:=Mod, + message:=Msg}, + #{depth:=Depth}=FormatOpts) -> + P = p(FormatOpts), + Format = + "** Undefined handle_info in ~p~n" + "** Unhandled message: "++P++"~n", + Args = + case Depth of + unlimited -> + [Mod,Msg]; + _ -> + [Mod,Msg,Depth] + end, + {Format,Args}. -get_proc_name(Pid) when is_pid(Pid) -> - Pid; -get_proc_name({local, Name}) -> - case process_info(self(), registered_name) of - {registered_name, Name} -> - Name; - {registered_name, _Name} -> - exit(process_not_registered); - [] -> - exit(process_not_registered) +fix_reason({undef,[{M,F,A,L}|MFAs]}=Reason) -> + case code:is_loaded(M) of + false -> + {'module could not be loaded',[{M,F,A,L}|MFAs]}; + _ -> + case erlang:function_exported(M, F, length(A)) of + true -> + Reason; + false -> + {'function not exported',[{M,F,A,L}|MFAs]} + end end; -get_proc_name({global, Name}) -> - case whereis_name(Name) of - undefined -> - exit(process_not_registered_globally); - Pid when Pid =:= self() -> - Name; - _Pid -> - exit(process_not_registered_globally) - end. +fix_reason(Reason) -> + Reason. -get_parent() -> - case get('$ancestors') of - [Parent | _] when is_pid(Parent)-> - Parent; - [Parent | _] when is_atom(Parent)-> - name_to_pid(Parent); - _ -> - exit(process_was_not_started_by_proc_lib) - end. +format_server_log_single([],_) -> + {"",[]}; +format_server_log_single(Log,FormatOpts) -> + Args = + case maps:get(depth,FormatOpts) of + unlimited -> + [Log]; + Depth -> + [Log, Depth] + end, + {" Log: "++p(FormatOpts),Args}. + +format_client_log_single(undefined,_) -> + {"",[]}; +format_client_log_single({From,dead},_) -> + {" Client ~0p is dead.",[From]}; +format_client_log_single({From,remote},_) -> + {" Client ~0p is remote on node ~0p.", [From, node(From)]}; +format_client_log_single({_From,{Name,Stacktrace0}},FormatOpts) -> + P = p(FormatOpts), + %% Minimize the stacktrace a bit for single line reports. This is + %% hopefully enough to point out the position. + Stacktrace = lists:sublist(Stacktrace0,4), + Args = + case maps:get(depth,FormatOpts) of + unlimited -> + [Name, Stacktrace]; + Depth -> + [Name, Depth, Stacktrace, Depth] + end, + {" Client "++P++" stacktrace: "++P++".", Args}. + +format_client_log(undefined,_) -> + {"", []}; +format_client_log({From,dead},_) -> + {"** Client ~p is dead~n", [From]}; +format_client_log({From,remote},_) -> + {"** Client ~p is remote on node ~p~n", [From, node(From)]}; +format_client_log({_From,{Name,Stacktrace}},FormatOpts) -> + P = p(FormatOpts), + Format = lists:append(["** Client ",P," stacktrace~n", + "** ",P,"~n"]), + Args = + case maps:get(depth,FormatOpts) of + unlimited -> + [Name, Stacktrace]; + Depth -> + [Name, Depth, Stacktrace, Depth] + end, + {Format,Args}. -name_to_pid(Name) -> - case whereis(Name) of - undefined -> - case whereis_name(Name) of - undefined -> - exit(could_not_find_registered_name); - Pid -> - Pid - end; - Pid -> - Pid - end. +p(#{single_line:=Single,depth:=Depth,encoding:=Enc}) -> + "~"++single(Single)++mod(Enc)++p(Depth); +p(unlimited) -> + "p"; +p(_Depth) -> + "P". -whereis_name(Name) -> - case ets:lookup(global_names, Name) of - [{_Name, Pid, _Method, _RPid, _Ref}] -> - if node(Pid) == node() -> - case is_process_alive(Pid) of - true -> Pid; - false -> undefined - end; - true -> - Pid - end; - [] -> undefined - end. +single(true) -> "0"; +single(false) -> "". -find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> - PCall = function_exported_or_default(Mod, 'prioritise_call', 4, - fun (_Msg, _From, _State) -> 0 end), - PCast = function_exported_or_default(Mod, 'prioritise_cast', 3, - fun (_Msg, _State) -> 0 end), - PInfo = function_exported_or_default(Mod, 'prioritise_info', 3, - fun (_Msg, _State) -> 0 end), - GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }. - -function_exported_or_default(Mod, Fun, Arity, Default) -> - case erlang:function_exported(Mod, Fun, Arity) of - true -> case Arity of - 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue, - state = State }) -> - Length = priority_queue:len(Queue), - case catch Mod:Fun(Msg, Length, State) of - drop -> - drop; - Res when is_integer(Res) -> - Res; - Err -> - handle_common_termination(Err, Msg, GS2State) - end - end; - 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue, - state = State }) -> - Length = priority_queue:len(Queue), - case catch Mod:Fun(Msg, From, Length, State) of - Res when is_integer(Res) -> - Res; - Err -> - handle_common_termination(Err, Msg, GS2State) - end - end - end; - false -> Default - end. +mod(latin1) -> ""; +mod(_) -> "t". %%----------------------------------------------------------------- %% Status information %%----------------------------------------------------------------- format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, - #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] = - StatusData, - NameTag = if is_pid(Name) -> - pid_to_list(Name); - is_atom(Name) -> - Name - end, - Header = lists:concat(["Status for generic server ", NameTag]), - Log = sys:get_log(Debug), - Specfic = callback(Mod, format_status, [Opt, [PDict, State]], - fun () -> [{data, [{"State", State}]}] end), - Messages = callback(Mod, format_message_queue, [Opt, Queue], - fun () -> priority_queue:to_list(Queue) end), + [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, _HibernateAfterTimeout]] = StatusData, + Header = gen:format_status_header("Status for generic server", Name), + Status = + case gen:format_status(Mod, Opt, #{ state => State, log => sys:get_log(Debug) }, + [PDict, State]) of + #{ 'EXIT' := R } = M -> + M#{ '$status' => [{data,[{"State",R}]}] }; + %% Status is set when the old format_status/2 is called, + %% so we do a little backwards compatibility dance here + #{ '$status' := S } = M when is_list(S) -> M; + #{ '$status' := S } = M -> M#{ '$status' := [S] }; + #{ state := S } = M -> + M#{ '$status' => [{data, [{"State",S}] }] } + end, [{header, Header}, {data, [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, - {"Queued messages", Messages}]} | - Specfic]. - -callback(Mod, FunName, Args, DefaultThunk) -> - case erlang:function_exported(Mod, FunName, length(Args)) of - true -> case catch apply(Mod, FunName, Args) of - {'EXIT', _} -> DefaultThunk(); - Success -> Success - end; - false -> DefaultThunk() - end. - -stats_funs() -> - case ets:info(gen_server2_metrics) of - undefined -> - {fun(GS2State) -> GS2State end, - fun(GS2State) -> GS2State end}; - _ -> - {fun emit_stats/1, fun stop_stats/1} + {"Parent", Parent}, + {"Logged events", format_log_state(Mod, maps:get(log,Status))}]} | + maps:get('$status',Status)]. + +format_log_state(Mod, Log) -> + %% If format_status/1 was exported, the log has already been handled by + %% that call, so we should not pass all log events into the callback again. + case erlang:function_exported(Mod, format_status, 1) of + false -> + [case Event of + {out,Msg,From,State} -> + Status = gen:format_status( + Mod, terminate, #{ state => State }, + [get(), State]), + {out, Msg, From, maps:get(state, Status) }; + {noreply,State} -> + Status = gen:format_status( + Mod, terminate, #{ state => State }, + [get(), State]), + {noreply, maps:get(state, Status)}; + _ -> Event + end || Event <- Log]; + true -> + Log end. - -init_stats(State = #gs2_state{ emit_stats_fun = EmitStatsFun }) -> - StateWithInitTimer = rabbit_event:init_stats_timer(State, #gs2_state.timer), - next_stats_timer(EmitStatsFun(StateWithInitTimer)). - -next_stats_timer(State) -> - ensure_stats_timer(rabbit_event:reset_stats_timer(State, #gs2_state.timer)). - -ensure_stats_timer(State) -> - rabbit_event:ensure_stats_timer(State, - #gs2_state.timer, - emit_gen_server2_stats). - -stop_stats_timer(State) -> - rabbit_event:stop_stats_timer(State, #gs2_state.timer). - -emit_stats(State = #gs2_state{queue = Queue}) -> - rabbit_core_metrics:gen_server2_stats(self(), priority_queue:len(Queue)), - State. - -stop_stats(State) -> - rabbit_core_metrics:gen_server2_deleted(self()), - State. |