summaryrefslogtreecommitdiff
path: root/src/delegate.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/delegate.erl')
-rw-r--r--src/delegate.erl170
1 files changed, 119 insertions, 51 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 4e1dcd2e..0331ca01 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,23 +18,32 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2,
+ monitor/2, demonitor/1, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-record(state, {node, monitors, name}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([monitor_ref/0]).
+
+-type(monitor_ref() :: reference() | {atom(), pid()}).
+-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}).
+
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
--spec(invoke/2 ::
- ( pid(), fun ((pid()) -> A)) -> A;
- ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
- [{pid(), term()}]}).
--spec(invoke_no_result/2 ::
- (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A;
+ ([pid()], fun_or_mfa(A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok').
+-spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
+-spec(demonitor/1 :: (monitor_ref()) -> 'true').
+
-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
@@ -50,26 +59,27 @@
%%----------------------------------------------------------------------------
start_link(Num) ->
- gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+ Name = delegate_name(Num),
+ gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
-invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
- Fun(Pid);
-invoke(Pid, Fun) when is_pid(Pid) ->
- case invoke([Pid], Fun) of
+invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ apply1(FunOrMFA, Pid);
+invoke(Pid, FunOrMFA) when is_pid(Pid) ->
+ case invoke([Pid], FunOrMFA) of
{[{Pid, Result}], []} ->
Result;
{[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
-invoke([], _Fun) -> %% optimisation
+invoke([], _FunOrMFA) -> %% optimisation
{[], []};
-invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
- case safe_invoke(Pid, Fun) of
+invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
+ case safe_invoke(Pid, FunOrMFA) of
{ok, _, Result} -> {[{Pid, Result}], []};
{error, _, Error} -> {[], [{Pid, Error}]}
end;
-invoke(Pids, Fun) when is_list(Pids) ->
+invoke(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
%% The use of multi_call is only safe because the timeout is
%% infinity, and thus there is no process spawned in order to do
@@ -78,45 +88,58 @@ invoke(Pids, Fun) when is_list(Pids) ->
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped}, infinity)
+ RemoteNodes, delegate(self(), RemoteNodes),
+ {invoke, FunOrMFA, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
Pid <- orddict:fetch(BadNode, Grouped)],
- ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) |
[Results || {_Node, Results} <- Replies]]),
lists:foldl(
fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
- safe_invoke(Pid, Fun), %% we don't care about any error
+invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, FunOrMFA), %% we don't care about any error
ok;
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result([Pid], Fun);
+invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) ->
+ invoke_no_result([Pid], FunOrMFA);
-invoke_no_result([], _Fun) -> %% optimisation
+invoke_no_result([], _FunOrMFA) -> %% optimisation
ok;
-invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
- safe_invoke(Pid, Fun), %% must not die
+invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
+ safe_invoke(Pid, FunOrMFA), %% must not die
ok;
-invoke_no_result(Pids, Fun) when is_list(Pids) ->
+invoke_no_result(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped})
+ RemoteNodes -> gen_server2:abcast(
+ RemoteNodes, delegate(self(), RemoteNodes),
+ {invoke, FunOrMFA, Grouped})
end,
- safe_invoke(LocalPids, Fun), %% must not die
+ safe_invoke(LocalPids, FunOrMFA), %% must not die
ok.
+monitor(process, Pid) when node(Pid) =:= node() ->
+ erlang:monitor(process, Pid);
+monitor(process, Pid) ->
+ Name = delegate(Pid, [node(Pid)]),
+ gen_server2:cast(Name, {monitor, self(), Pid}),
+ {Name, Pid}.
+
+demonitor(Ref) when is_reference(Ref) ->
+ erlang:demonitor(Ref);
+demonitor({Name, Pid}) ->
+ gen_server2:cast(Name, {demonitor, self(), Pid}).
+
call(PidOrPids, Msg) ->
- invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
+ invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}).
cast(PidOrPids, Msg) ->
- invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end).
+ invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}).
%%----------------------------------------------------------------------------
@@ -134,43 +157,88 @@ group_pids_by_node(Pids) ->
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate(RemoteNodes) ->
+delegate(Pid, RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(self(),
+ erlang:phash2(Pid,
delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
end.
-safe_invoke(Pids, Fun) when is_list(Pids) ->
- [safe_invoke(Pid, Fun) || Pid <- Pids];
-safe_invoke(Pid, Fun) when is_pid(Pid) ->
+safe_invoke(Pids, FunOrMFA) when is_list(Pids) ->
+ [safe_invoke(Pid, FunOrMFA) || Pid <- Pids];
+safe_invoke(Pid, FunOrMFA) when is_pid(Pid) ->
try
- {ok, Pid, Fun(Pid)}
+ {ok, Pid, apply1(FunOrMFA, Pid)}
catch Class:Reason ->
{error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
+apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]);
+apply1(Fun, Arg) -> Fun(Arg).
+
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, node(), hibernate,
+init([Name]) ->
+ {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({invoke, Fun, Grouped}, _From, Node) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-
-handle_cast({invoke, Fun, Grouped}, Node) ->
- safe_invoke(orddict:fetch(Node, Grouped), Fun),
- {noreply, Node, hibernate}.
-
-handle_info(_Info, Node) ->
- {noreply, Node, hibernate}.
+handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State,
+ hibernate}.
+
+handle_cast({monitor, MonitoringPid, Pid},
+ State = #state{monitors = Monitors}) ->
+ Monitors1 = case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Pids1 = gb_sets:add_element(MonitoringPid, Pids),
+ dict:store(Pid, {Ref, Pids1}, Monitors);
+ error ->
+ Ref = erlang:monitor(process, Pid),
+ Pids = gb_sets:singleton(MonitoringPid),
+ dict:store(Pid, {Ref, Pids}, Monitors)
+ end,
+ {noreply, State#state{monitors = Monitors1}, hibernate};
+
+handle_cast({demonitor, MonitoringPid, Pid},
+ State = #state{monitors = Monitors}) ->
+ Monitors1 = case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Pids1 = gb_sets:del_element(MonitoringPid, Pids),
+ case gb_sets:is_empty(Pids1) of
+ true -> erlang:demonitor(Ref),
+ dict:erase(Pid, Monitors);
+ false -> dict:store(Pid, {Ref, Pids1}, Monitors)
+ end;
+ error ->
+ Monitors
+ end,
+ {noreply, State#state{monitors = Monitors1}, hibernate};
+
+handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) ->
+ safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA),
+ {noreply, State, hibernate}.
+
+handle_info({'DOWN', Ref, process, Pid, Info},
+ State = #state{monitors = Monitors, name = Name}) ->
+ {noreply,
+ case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Msg = {'DOWN', {Name, Pid}, process, Pid, Info},
+ gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end,
+ none, Pids),
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
+
+handle_info(_Info, State) ->
+ {noreply, State, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, Node, _Extra) ->
- {ok, Node}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.