diff options
authorSimon MacMullen <>2013-07-23 14:33:17 +0100
committerSimon MacMullen <>2013-07-23 14:33:17 +0100
commitc433f342d1a61b5a188bb4de449127a8ddb5433f (patch)
parent3de186a95a8c81cd83b9b4c7e5a7aa5cfbc12f5f (diff)
Make monitoring via delegates async. This has the downside that you can't monitor the same pid more than once from the same process, but that is enforced by pmon anyway which is the only client of this code. The upside is that cross-cluster basic.get doesn't deadlock...
1 files changed, 29 insertions, 22 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index dad2dd3c..30ee33b6 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -24,7 +24,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {node, monitors}).
+-record(state, {node, monitors, name}).
@@ -32,7 +32,7 @@
--type(monitor_ref() :: reference() | {atom(), reference()}).
+-type(monitor_ref() :: reference() | {atom(), pid()}).
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
@@ -61,7 +61,8 @@
start_link(Num) ->
- gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+ Name = delegate_name(Num),
+ gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
@@ -128,14 +129,15 @@ monitor(Type, Pid) when node(Pid) =:= node() ->
erlang:monitor(Type, Pid);
monitor(Type, Pid) ->
Name = delegate(Pid, [node(Pid)]),
- {Name, gen_server2:call(Name, {monitor, Type, self(), Pid}, infinity)}.
+ gen_server2:cast(Name, {monitor, Type, self(), Pid}),
+ {Name, Pid}.
demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
demonitor(Ref, Options) when is_reference(Ref) ->
erlang:demonitor(Ref, Options);
-demonitor({Name, Ref}, Options) ->
- gen_server2:call(Name, {demonitor, Ref, Options}, infinity).
+demonitor({Name, Pid}, Options) ->
+ gen_server2:cast(Name, {demonitor, Pid, Options}).
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -180,34 +182,39 @@ safe_invoke(Pid, Fun) when is_pid(Pid) ->
-init([]) ->
- {ok, #state{node = node(), monitors = dict:new()}, hibernate,
+init([Name]) ->
+ {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate};
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}.
-handle_call({monitor, Type, WantsMonitor, ToMonitor}, _From,
+handle_cast({monitor, Type, WantsMonitor, Pid},
State = #state{monitors = Monitors}) ->
- Ref = erlang:monitor(Type, ToMonitor),
- State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)},
- {reply, Ref, State1, hibernate};
+ Ref = erlang:monitor(Type, Pid),
+ Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors),
+ {noreply, State#state{monitors = Monitors1}, hibernate};
-handle_call({demonitor, Ref, Options}, _From,
+handle_cast({demonitor, Pid, Options},
State = #state{monitors = Monitors}) ->
- State1 = State#state{monitors = dict:erase(Ref, Monitors)},
- {reply, erlang:demonitor(Ref, Options), State1, hibernate}.
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {_WantsMonitor, Ref}} ->
+ erlang:demonitor(Ref, Options),
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
{noreply, State, hibernate}.
-handle_info({'DOWN', Ref, process, Object, Info},
- State = #state{monitors = Monitors}) ->
- {noreply, case dict:find(Ref, Monitors) of
- {ok, WantsMonitor} ->
- WantsMonitor ! {'DOWN', Ref, process, Object, Info},
- State#state{monitors = dict:erase(Ref, Monitors)};
+handle_info({'DOWN', Ref, process, Pid, Info},
+ State = #state{monitors = Monitors, name = Name}) ->
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {WantsMonitor, Ref}} ->
+ WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info},
+ State#state{monitors = dict:erase(Pid, Monitors)};
error ->
end, hibernate};