diff options
Diffstat (limited to 'src/delegate.erl')
-rw-r--r-- | src/delegate.erl | 170 |
1 files changed, 119 insertions, 51 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 4e1dcd2e..0331ca01 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,23 +18,32 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, + monitor/2, demonitor/1, call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(state, {node, monitors, name}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([monitor_ref/0]). + +-type(monitor_ref() :: reference() | {atom(), pid()}). +-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}). + -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). --spec(invoke/2 :: - ( pid(), fun ((pid()) -> A)) -> A; - ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], - [{pid(), term()}]}). --spec(invoke_no_result/2 :: - (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A; + ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], + [{pid(), term()}]}). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok'). +-spec(monitor/2 :: ('process', pid()) -> monitor_ref()). +-spec(demonitor/1 :: (monitor_ref()) -> 'true'). + -spec(call/2 :: ( pid(), any()) -> any(); ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}). @@ -50,26 +59,27 @@ %%---------------------------------------------------------------------------- start_link(Num) -> - gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). + Name = delegate_name(Num), + gen_server2:start_link({local, Name}, ?MODULE, [Name], []). -invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - Fun(Pid); -invoke(Pid, Fun) when is_pid(Pid) -> - case invoke([Pid], Fun) of +invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + apply1(FunOrMFA, Pid); +invoke(Pid, FunOrMFA) when is_pid(Pid) -> + case invoke([Pid], FunOrMFA) of {[{Pid, Result}], []} -> Result; {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; -invoke([], _Fun) -> %% optimisation +invoke([], _FunOrMFA) -> %% optimisation {[], []}; -invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - case safe_invoke(Pid, Fun) of +invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of {ok, _, Result} -> {[{Pid, Result}], []}; {error, _, Error} -> {[], [{Pid, Error}]} end; -invoke(Pids, Fun) when is_list(Pids) -> +invoke(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), %% The use of multi_call is only safe because the timeout is %% infinity, and thus there is no process spawned in order to do @@ -78,45 +88,58 @@ invoke(Pids, Fun) when is_list(Pids) -> case orddict:fetch_keys(Grouped) of [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( - RemoteNodes, delegate(RemoteNodes), - {invoke, Fun, Grouped}, infinity) + RemoteNodes, delegate(self(), RemoteNodes), + {invoke, FunOrMFA, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, Pid <- orddict:fetch(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | [Results || {_Node, Results} <- Replies]]), lists:foldl( fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - safe_invoke(Pid, Fun), %% we don't care about any error +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, FunOrMFA), %% we don't care about any error ok; -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result([Pid], Fun); +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> + invoke_no_result([Pid], FunOrMFA); -invoke_no_result([], _Fun) -> %% optimisation +invoke_no_result([], _FunOrMFA) -> %% optimisation ok; -invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - safe_invoke(Pid, Fun), %% must not die +invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + safe_invoke(Pid, FunOrMFA), %% must not die ok; -invoke_no_result(Pids, Fun) when is_list(Pids) -> +invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), - {invoke, Fun, Grouped}) + RemoteNodes -> gen_server2:abcast( + RemoteNodes, delegate(self(), RemoteNodes), + {invoke, FunOrMFA, Grouped}) end, - safe_invoke(LocalPids, Fun), %% must not die + safe_invoke(LocalPids, FunOrMFA), %% must not die ok. +monitor(process, Pid) when node(Pid) =:= node() -> + erlang:monitor(process, Pid); +monitor(process, Pid) -> + Name = delegate(Pid, [node(Pid)]), + gen_server2:cast(Name, {monitor, self(), Pid}), + {Name, Pid}. + +demonitor(Ref) when is_reference(Ref) -> + erlang:demonitor(Ref); +demonitor({Name, Pid}) -> + gen_server2:cast(Name, {demonitor, self(), Pid}). + call(PidOrPids, Msg) -> - invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). + invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}). cast(PidOrPids, Msg) -> - invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end). + invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}). %%---------------------------------------------------------------------------- @@ -134,43 +157,88 @@ group_pids_by_node(Pids) -> delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate(RemoteNodes) -> +delegate(Pid, RemoteNodes) -> case get(delegate) of undefined -> Name = delegate_name( - erlang:phash2(self(), + erlang:phash2(Pid, delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; Name -> Name end. -safe_invoke(Pids, Fun) when is_list(Pids) -> - [safe_invoke(Pid, Fun) || Pid <- Pids]; -safe_invoke(Pid, Fun) when is_pid(Pid) -> +safe_invoke(Pids, FunOrMFA) when is_list(Pids) -> + [safe_invoke(Pid, FunOrMFA) || Pid <- Pids]; +safe_invoke(Pid, FunOrMFA) when is_pid(Pid) -> try - {ok, Pid, Fun(Pid)} + {ok, Pid, apply1(FunOrMFA, Pid)} catch Class:Reason -> {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. +apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]); +apply1(Fun, Arg) -> Fun(Arg). + %%---------------------------------------------------------------------------- -init([]) -> - {ok, node(), hibernate, +init([Name]) -> + {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, Node) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. - -handle_cast({invoke, Fun, Grouped}, Node) -> - safe_invoke(orddict:fetch(Node, Grouped), Fun), - {noreply, Node, hibernate}. - -handle_info(_Info, Node) -> - {noreply, Node, hibernate}. +handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State, + hibernate}. + +handle_cast({monitor, MonitoringPid, Pid}, + State = #state{monitors = Monitors}) -> + Monitors1 = case dict:find(Pid, Monitors) of + {ok, {Ref, Pids}} -> + Pids1 = gb_sets:add_element(MonitoringPid, Pids), + dict:store(Pid, {Ref, Pids1}, Monitors); + error -> + Ref = erlang:monitor(process, Pid), + Pids = gb_sets:singleton(MonitoringPid), + dict:store(Pid, {Ref, Pids}, Monitors) + end, + {noreply, State#state{monitors = Monitors1}, hibernate}; + +handle_cast({demonitor, MonitoringPid, Pid}, + State = #state{monitors = Monitors}) -> + Monitors1 = case dict:find(Pid, Monitors) of + {ok, {Ref, Pids}} -> + Pids1 = gb_sets:del_element(MonitoringPid, Pids), + case gb_sets:is_empty(Pids1) of + true -> erlang:demonitor(Ref), + dict:erase(Pid, Monitors); + false -> dict:store(Pid, {Ref, Pids1}, Monitors) + end; + error -> + Monitors + end, + {noreply, State#state{monitors = Monitors1}, hibernate}; + +handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> + safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), + {noreply, State, hibernate}. + +handle_info({'DOWN', Ref, process, Pid, Info}, + State = #state{monitors = Monitors, name = Name}) -> + {noreply, + case dict:find(Pid, Monitors) of + {ok, {Ref, Pids}} -> + Msg = {'DOWN', {Name, Pid}, process, Pid, Info}, + gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end, + none, Pids), + State#state{monitors = dict:erase(Pid, Monitors)}; + error -> + State + end, hibernate}; + +handle_info(_Info, State) -> + {noreply, State, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, Node, _Extra) -> - {ok, Node}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. |